polars_python/dataframe/
construction.rs1use polars::frame::row::{Row, rows_to_schema_supertypes, rows_to_supertypes};
2use polars::prelude::*;
3use pyo3::prelude::*;
4use pyo3::types::PyDict;
5
6use super::PyDataFrame;
7use crate::conversion::any_value::py_object_to_any_value;
8use crate::conversion::{Wrap, vec_extract_wrapped};
9use crate::error::PyPolarsErr;
10use crate::interop;
11use crate::utils::EnterPolarsExt;
12
13#[pymethods]
14impl PyDataFrame {
15 #[staticmethod]
16 #[pyo3(signature = (data, schema=None, infer_schema_length=None))]
17 pub fn from_rows(
18 py: Python,
19 data: Vec<Wrap<Row>>,
20 schema: Option<Wrap<Schema>>,
21 infer_schema_length: Option<usize>,
22 ) -> PyResult<Self> {
23 let data = vec_extract_wrapped(data);
24 let schema = schema.map(|wrap| wrap.0);
25 py.enter_polars(move || finish_from_rows(data, schema, None, infer_schema_length))
26 }
27
28 #[staticmethod]
29 #[pyo3(signature = (data, schema=None, schema_overrides=None, strict=true, infer_schema_length=None))]
30 pub fn from_dicts(
31 py: Python,
32 data: &Bound<PyAny>,
33 schema: Option<Wrap<Schema>>,
34 schema_overrides: Option<Wrap<Schema>>,
35 strict: bool,
36 infer_schema_length: Option<usize>,
37 ) -> PyResult<Self> {
38 let schema = schema.map(|wrap| wrap.0);
39 let schema_overrides = schema_overrides.map(|wrap| wrap.0);
40
41 let names = get_schema_names(data, schema.as_ref(), infer_schema_length)?;
42 let rows = dicts_to_rows(data, &names, strict)?;
43
44 let schema = schema.or_else(|| {
45 Some(columns_names_to_empty_schema(
46 names.iter().map(String::as_str),
47 ))
48 });
49
50 py.enter_polars(move || {
51 finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
52 })
53 }
54
55 #[staticmethod]
56 pub fn from_arrow_record_batches(
57 py: Python,
58 rb: Vec<Bound<PyAny>>,
59 schema: Bound<PyAny>,
60 ) -> PyResult<Self> {
61 let df = interop::arrow::to_rust::to_rust_df(py, &rb, schema)?;
62 Ok(Self::from(df))
63 }
64}
65
66fn finish_from_rows(
67 rows: Vec<Row>,
68 schema: Option<Schema>,
69 schema_overrides: Option<Schema>,
70 infer_schema_length: Option<usize>,
71) -> PyResult<PyDataFrame> {
72 let mut schema = if let Some(mut schema) = schema {
73 resolve_schema_overrides(&mut schema, schema_overrides);
74 update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;
75 schema
76 } else {
77 rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?
78 };
79
80 erase_decimal_precision_scale(&mut schema);
84
85 let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;
86 Ok(df.into())
87}
88
89fn update_schema_from_rows(
90 schema: &mut Schema,
91 rows: &[Row],
92 infer_schema_length: Option<usize>,
93) -> PyResult<()> {
94 let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known());
95 if schema_is_complete {
96 return Ok(());
97 }
98
99 let inferred_dtypes =
101 rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;
102 let inferred_dtypes_slice = inferred_dtypes.as_slice();
103
104 for (i, dtype) in schema.iter_values_mut().enumerate() {
105 if !dtype.is_known() {
106 *dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {
107 polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")
108 })
109 .map_err(PyPolarsErr::from)?
110 .clone();
111 }
112 }
113 Ok(())
114}
115
116fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) {
120 if let Some(overrides) = schema_overrides {
121 for (name, dtype) in overrides.into_iter() {
122 schema.set_dtype(name.as_str(), dtype);
123 }
124 }
125}
126
127fn erase_decimal_precision_scale(schema: &mut Schema) {
129 for dtype in schema.iter_values_mut() {
130 if let DataType::Decimal(_, _) = dtype {
131 *dtype = DataType::Decimal(None, None)
132 }
133 }
134}
135
136fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema
137where
138 I: IntoIterator<Item = &'a str>,
139{
140 let fields = column_names
141 .into_iter()
142 .map(|c| Field::new(c.into(), DataType::Unknown(Default::default())));
143 Schema::from_iter(fields)
144}
145
146fn dicts_to_rows<'a>(
147 data: &Bound<'a, PyAny>,
148 names: &'a [String],
149 strict: bool,
150) -> PyResult<Vec<Row<'a>>> {
151 let len = data.len()?;
152 let mut rows = Vec::with_capacity(len);
153 for d in data.try_iter()? {
154 let d = d?;
155 let d = d.downcast::<PyDict>()?;
156
157 let mut row = Vec::with_capacity(names.len());
158 for k in names.iter() {
159 let val = match d.get_item(k)? {
160 None => AnyValue::Null,
161 Some(val) => py_object_to_any_value(&val.as_borrowed(), strict, true)?,
162 };
163 row.push(val)
164 }
165 rows.push(Row(row))
166 }
167 Ok(rows)
168}
169
170fn get_schema_names(
172 data: &Bound<PyAny>,
173 schema: Option<&Schema>,
174 infer_schema_length: Option<usize>,
175) -> PyResult<Vec<String>> {
176 if let Some(schema) = schema {
177 Ok(schema.iter_names().map(|n| n.to_string()).collect())
178 } else {
179 infer_schema_names_from_data(data, infer_schema_length)
180 }
181}
182
183fn infer_schema_names_from_data(
188 data: &Bound<PyAny>,
189 infer_schema_length: Option<usize>,
190) -> PyResult<Vec<String>> {
191 let data_len = data.len()?;
192 let infer_schema_length = infer_schema_length
193 .map(|n| std::cmp::max(1, n))
194 .unwrap_or(data_len);
195
196 let mut names = PlIndexSet::new();
197 for d in data.try_iter()?.take(infer_schema_length) {
198 let d = d?;
199 let d = d.downcast::<PyDict>()?;
200 let keys = d.keys();
201 for name in keys {
202 let name = name.extract::<String>()?;
203 names.insert(name);
204 }
205 }
206 Ok(names.into_iter().collect())
207}