1use polars::frame::row::{Row, rows_to_schema_supertypes, rows_to_supertypes};
2use polars::prelude::*;
3use pyo3::prelude::*;
4use pyo3::types::{PyDict, PyMapping, PyString};
5
6use super::PyDataFrame;
7use crate::conversion::any_value::py_object_to_any_value;
8use crate::conversion::{Wrap, vec_extract_wrapped};
9use crate::error::PyPolarsErr;
10use crate::interop;
11use crate::utils::EnterPolarsExt;
12
13#[pymethods]
14impl PyDataFrame {
15 #[staticmethod]
16 #[pyo3(signature = (data, schema=None, infer_schema_length=None))]
17 pub fn from_rows(
18 py: Python<'_>,
19 data: Vec<Wrap<Row>>,
20 schema: Option<Wrap<Schema>>,
21 infer_schema_length: Option<usize>,
22 ) -> PyResult<Self> {
23 let data = vec_extract_wrapped(data);
24 let schema = schema.map(|wrap| wrap.0);
25 py.enter_polars(move || finish_from_rows(data, schema, None, infer_schema_length))
26 }
27
28 #[staticmethod]
29 #[pyo3(signature = (data, schema=None, schema_overrides=None, strict=true, infer_schema_length=None))]
30 pub fn from_dicts(
31 py: Python<'_>,
32 data: &Bound<PyAny>,
33 schema: Option<Wrap<Schema>>,
34 schema_overrides: Option<Wrap<Schema>>,
35 strict: bool,
36 infer_schema_length: Option<usize>,
37 ) -> PyResult<Self> {
38 let schema = schema.map(|wrap| wrap.0);
39 let schema_overrides = schema_overrides.map(|wrap| wrap.0);
40
41 let from_mapping = data.len()? > 0 && {
44 let mut iter = data.try_iter()?;
45 loop {
46 match iter.next() {
47 Some(Ok(item)) if !item.is_none() => break !item.is_instance_of::<PyDict>(),
48 Some(Err(e)) => return Err(e),
49 Some(_) => continue,
50 None => break false,
51 }
52 }
53 };
54
55 let names = get_schema_names(data, schema.as_ref(), infer_schema_length, from_mapping)?;
57 let rows = if from_mapping {
58 mappings_to_rows(data, &names, strict)?
59 } else {
60 dicts_to_rows(data, &names, strict)?
61 };
62
63 let schema = schema.or_else(|| {
64 Some(columns_names_to_empty_schema(
65 names.iter().map(String::as_str),
66 ))
67 });
68 py.enter_polars(move || {
69 finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
70 })
71 }
72
73 #[staticmethod]
74 pub fn from_arrow_record_batches(
75 py: Python<'_>,
76 rb: Vec<Bound<PyAny>>,
77 schema: Bound<PyAny>,
78 ) -> PyResult<Self> {
79 let df = interop::arrow::to_rust::to_rust_df(py, &rb, schema)?;
80 Ok(Self::from(df))
81 }
82}
83
84fn finish_from_rows(
85 rows: Vec<Row>,
86 schema: Option<Schema>,
87 schema_overrides: Option<Schema>,
88 infer_schema_length: Option<usize>,
89) -> PyResult<PyDataFrame> {
90 let mut schema = if let Some(mut schema) = schema {
91 resolve_schema_overrides(&mut schema, schema_overrides);
92 update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;
93 schema
94 } else {
95 rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?
96 };
97
98 erase_decimal_precision_scale(&mut schema);
102
103 let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;
104 Ok(df.into())
105}
106
107fn update_schema_from_rows(
108 schema: &mut Schema,
109 rows: &[Row],
110 infer_schema_length: Option<usize>,
111) -> PyResult<()> {
112 let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known());
113 if schema_is_complete {
114 return Ok(());
115 }
116
117 let inferred_dtypes =
119 rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;
120 let inferred_dtypes_slice = inferred_dtypes.as_slice();
121
122 for (i, dtype) in schema.iter_values_mut().enumerate() {
123 if !dtype.is_known() {
124 *dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {
125 polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")
126 })
127 .map_err(PyPolarsErr::from)?
128 .clone();
129 }
130 }
131 Ok(())
132}
133
134fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) {
138 if let Some(overrides) = schema_overrides {
139 for (name, dtype) in overrides.into_iter() {
140 schema.set_dtype(name.as_str(), dtype);
141 }
142 }
143}
144
145fn erase_decimal_precision_scale(schema: &mut Schema) {
147 for dtype in schema.iter_values_mut() {
148 if let DataType::Decimal(_, _) = dtype {
149 *dtype = DataType::Decimal(None, None)
150 }
151 }
152}
153
154fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema
155where
156 I: IntoIterator<Item = &'a str>,
157{
158 let fields = column_names
159 .into_iter()
160 .map(|c| Field::new(c.into(), DataType::Unknown(Default::default())));
161 Schema::from_iter(fields)
162}
163
164fn dicts_to_rows(
165 data: &Bound<'_, PyAny>,
166 names: &[String],
167 strict: bool,
168) -> PyResult<Vec<Row<'static>>> {
169 let py = data.py();
170 let mut rows = Vec::with_capacity(data.len()?);
171 let null_row = Row::new(vec![AnyValue::Null; names.len()]);
172
173 let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();
175
176 for d in data.try_iter()? {
177 let d = d?;
178 if d.is_none() {
179 rows.push(null_row.clone())
180 } else {
181 let d = d.downcast::<PyDict>()?;
182 let mut row = Vec::with_capacity(names.len());
183 for k in &py_keys {
184 let val = match d.get_item(k)? {
185 None => AnyValue::Null,
186 Some(py_val) => py_object_to_any_value(&py_val.as_borrowed(), strict, true)?,
187 };
188 row.push(val)
189 }
190 rows.push(Row(row))
191 }
192 }
193 Ok(rows)
194}
195
196fn mappings_to_rows(
197 data: &Bound<'_, PyAny>,
198 names: &[String],
199 strict: bool,
200) -> PyResult<Vec<Row<'static>>> {
201 let py = data.py();
202 let mut rows = Vec::with_capacity(data.len()?);
203 let null_row = Row::new(vec![AnyValue::Null; names.len()]);
204
205 let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();
207
208 for d in data.try_iter()? {
209 let d = d?;
210 if d.is_none() {
211 rows.push(null_row.clone())
212 } else {
213 let d = d.downcast::<PyMapping>()?;
214 let mut row = Vec::with_capacity(names.len());
215 for k in &py_keys {
216 let py_val = d.get_item(k)?;
217 let val = if py_val.is_none() {
218 AnyValue::Null
219 } else {
220 py_object_to_any_value(&py_val, strict, true)?
221 };
222 row.push(val)
223 }
224 rows.push(Row(row))
225 }
226 }
227 Ok(rows)
228}
229
230fn get_schema_names(
232 data: &Bound<PyAny>,
233 schema: Option<&Schema>,
234 infer_schema_length: Option<usize>,
235 from_mapping: bool,
236) -> PyResult<Vec<String>> {
237 if let Some(schema) = schema {
238 Ok(schema.iter_names().map(|n| n.to_string()).collect())
239 } else {
240 let data_len = data.len()?;
241 let infer_schema_length = infer_schema_length
242 .map(|n| std::cmp::max(1, n))
243 .unwrap_or(data_len);
244
245 if from_mapping {
246 infer_schema_names_from_mapping_data(data, infer_schema_length)
247 } else {
248 infer_schema_names_from_dict_data(data, infer_schema_length)
249 }
250 }
251}
252
253fn infer_schema_names_from_dict_data(
258 data: &Bound<PyAny>,
259 infer_schema_length: usize,
260) -> PyResult<Vec<String>> {
261 let mut names = PlIndexSet::new();
262 for d in data.try_iter()?.take(infer_schema_length) {
263 let d = d?;
264 if !d.is_none() {
265 let d = d.downcast::<PyDict>()?;
266 let keys = d.keys().iter();
267 for name in keys {
268 let name = name.extract::<String>()?;
269 names.insert(name);
270 }
271 }
272 }
273 Ok(names.into_iter().collect())
274}
275
276fn infer_schema_names_from_mapping_data(
281 data: &Bound<PyAny>,
282 infer_schema_length: usize,
283) -> PyResult<Vec<String>> {
284 let mut names = PlIndexSet::new();
285 for d in data.try_iter()?.take(infer_schema_length) {
286 let d = d?;
287 if !d.is_none() {
288 let d = d.downcast::<PyMapping>()?;
289 let keys = d.keys()?;
290 for name in keys {
291 let name = name.extract::<String>()?;
292 names.insert(name);
293 }
294 }
295 }
296 Ok(names.into_iter().collect())
297}