1use polars::frame::row::{Row, rows_to_schema_supertypes, rows_to_supertypes};
2use polars::prelude::*;
3use pyo3::prelude::*;
4use pyo3::types::{PyDict, PyMapping, PyString};
5
6use super::PyDataFrame;
7use crate::conversion::any_value::py_object_to_any_value;
8use crate::conversion::{Wrap, vec_extract_wrapped};
9use crate::error::PyPolarsErr;
10use crate::interop;
11use crate::utils::EnterPolarsExt;
12
13#[pymethods]
14impl PyDataFrame {
15 #[staticmethod]
16 #[pyo3(signature = (data, schema=None, infer_schema_length=None))]
17 pub fn from_rows(
18 py: Python<'_>,
19 data: Vec<Wrap<Row>>,
20 schema: Option<Wrap<Schema>>,
21 infer_schema_length: Option<usize>,
22 ) -> PyResult<Self> {
23 let data = vec_extract_wrapped(data);
24 let schema = schema.map(|wrap| wrap.0);
25 py.enter_polars(move || finish_from_rows(data, schema, None, infer_schema_length))
26 }
27
28 #[staticmethod]
29 #[pyo3(signature = (data, schema=None, schema_overrides=None, strict=true, infer_schema_length=None))]
30 pub fn from_dicts(
31 py: Python<'_>,
32 data: &Bound<PyAny>,
33 schema: Option<Wrap<Schema>>,
34 schema_overrides: Option<Wrap<Schema>>,
35 strict: bool,
36 infer_schema_length: Option<usize>,
37 ) -> PyResult<Self> {
38 let schema = schema.map(|wrap| wrap.0);
39 let schema_overrides = schema_overrides.map(|wrap| wrap.0);
40
41 let from_mapping = data.len()? > 0 && {
44 let mut iter = data.try_iter()?;
45 loop {
46 match iter.next() {
47 Some(Ok(item)) if !item.is_none() => break !item.is_instance_of::<PyDict>(),
48 Some(Err(e)) => return Err(e),
49 Some(_) => continue,
50 None => break false,
51 }
52 }
53 };
54
55 let names = get_schema_names(data, schema.as_ref(), infer_schema_length, from_mapping)?;
57 let rows = if from_mapping {
58 mappings_to_rows(data, &names, strict)?
59 } else {
60 dicts_to_rows(data, &names, strict)?
61 };
62
63 let schema = schema.or_else(|| {
64 Some(columns_names_to_empty_schema(
65 names.iter().map(String::as_str),
66 ))
67 });
68 py.enter_polars(move || {
69 finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
70 })
71 }
72
73 #[staticmethod]
74 pub fn from_arrow_record_batches(
75 py: Python<'_>,
76 rb: Vec<Bound<PyAny>>,
77 schema: Bound<PyAny>,
78 ) -> PyResult<Self> {
79 let df = interop::arrow::to_rust::to_rust_df(py, &rb, schema)?;
80 Ok(Self::from(df))
81 }
82}
83
84fn finish_from_rows(
85 rows: Vec<Row>,
86 schema: Option<Schema>,
87 schema_overrides: Option<Schema>,
88 infer_schema_length: Option<usize>,
89) -> PyResult<PyDataFrame> {
90 let schema = if let Some(mut schema) = schema {
91 resolve_schema_overrides(&mut schema, schema_overrides);
92 update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;
93 schema
94 } else {
95 rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?
96 };
97
98 let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;
99 Ok(df.into())
100}
101
102fn update_schema_from_rows(
103 schema: &mut Schema,
104 rows: &[Row],
105 infer_schema_length: Option<usize>,
106) -> PyResult<()> {
107 let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known());
108 if schema_is_complete {
109 return Ok(());
110 }
111
112 let inferred_dtypes =
114 rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;
115 let inferred_dtypes_slice = inferred_dtypes.as_slice();
116
117 for (i, dtype) in schema.iter_values_mut().enumerate() {
118 if !dtype.is_known() {
119 *dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {
120 polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")
121 })
122 .map_err(PyPolarsErr::from)?
123 .clone();
124 }
125 }
126 Ok(())
127}
128
129fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) {
133 if let Some(overrides) = schema_overrides {
134 for (name, dtype) in overrides.into_iter() {
135 schema.set_dtype(name.as_str(), dtype);
136 }
137 }
138}
139
140fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema
141where
142 I: IntoIterator<Item = &'a str>,
143{
144 let fields = column_names
145 .into_iter()
146 .map(|c| Field::new(c.into(), DataType::Unknown(Default::default())));
147 Schema::from_iter(fields)
148}
149
150fn dicts_to_rows(
151 data: &Bound<'_, PyAny>,
152 names: &[String],
153 strict: bool,
154) -> PyResult<Vec<Row<'static>>> {
155 let py = data.py();
156 let mut rows = Vec::with_capacity(data.len()?);
157 let null_row = Row::new(vec![AnyValue::Null; names.len()]);
158
159 let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();
161
162 for d in data.try_iter()? {
163 let d = d?;
164 if d.is_none() {
165 rows.push(null_row.clone())
166 } else {
167 let d = d.downcast::<PyDict>()?;
168 let mut row = Vec::with_capacity(names.len());
169 for k in &py_keys {
170 let val = match d.get_item(k)? {
171 None => AnyValue::Null,
172 Some(py_val) => py_object_to_any_value(&py_val.as_borrowed(), strict, true)?,
173 };
174 row.push(val)
175 }
176 rows.push(Row(row))
177 }
178 }
179 Ok(rows)
180}
181
182fn mappings_to_rows(
183 data: &Bound<'_, PyAny>,
184 names: &[String],
185 strict: bool,
186) -> PyResult<Vec<Row<'static>>> {
187 let py = data.py();
188 let mut rows = Vec::with_capacity(data.len()?);
189 let null_row = Row::new(vec![AnyValue::Null; names.len()]);
190
191 let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();
193
194 for d in data.try_iter()? {
195 let d = d?;
196 if d.is_none() {
197 rows.push(null_row.clone())
198 } else {
199 let d = d.downcast::<PyMapping>()?;
200 let mut row = Vec::with_capacity(names.len());
201 for k in &py_keys {
202 let py_val = d.get_item(k)?;
203 let val = if py_val.is_none() {
204 AnyValue::Null
205 } else {
206 py_object_to_any_value(&py_val, strict, true)?
207 };
208 row.push(val)
209 }
210 rows.push(Row(row))
211 }
212 }
213 Ok(rows)
214}
215
216fn get_schema_names(
218 data: &Bound<PyAny>,
219 schema: Option<&Schema>,
220 infer_schema_length: Option<usize>,
221 from_mapping: bool,
222) -> PyResult<Vec<String>> {
223 if let Some(schema) = schema {
224 Ok(schema.iter_names().map(|n| n.to_string()).collect())
225 } else {
226 let data_len = data.len()?;
227 let infer_schema_length = infer_schema_length
228 .map(|n| std::cmp::max(1, n))
229 .unwrap_or(data_len);
230
231 if from_mapping {
232 infer_schema_names_from_mapping_data(data, infer_schema_length)
233 } else {
234 infer_schema_names_from_dict_data(data, infer_schema_length)
235 }
236 }
237}
238
239fn infer_schema_names_from_dict_data(
244 data: &Bound<PyAny>,
245 infer_schema_length: usize,
246) -> PyResult<Vec<String>> {
247 let mut names = PlIndexSet::new();
248 for d in data.try_iter()?.take(infer_schema_length) {
249 let d = d?;
250 if !d.is_none() {
251 let d = d.downcast::<PyDict>()?;
252 let keys = d.keys().iter();
253 for name in keys {
254 let name = name.extract::<String>()?;
255 names.insert(name);
256 }
257 }
258 }
259 Ok(names.into_iter().collect())
260}
261
262fn infer_schema_names_from_mapping_data(
267 data: &Bound<PyAny>,
268 infer_schema_length: usize,
269) -> PyResult<Vec<String>> {
270 let mut names = PlIndexSet::new();
271 for d in data.try_iter()?.take(infer_schema_length) {
272 let d = d?;
273 if !d.is_none() {
274 let d = d.downcast::<PyMapping>()?;
275 let keys = d.keys()?;
276 for name in keys {
277 let name = name.extract::<String>()?;
278 names.insert(name);
279 }
280 }
281 }
282 Ok(names.into_iter().collect())
283}