polars_python/dataframe/
construction.rs

1use polars::frame::row::{Row, rows_to_schema_supertypes, rows_to_supertypes};
2use polars::prelude::*;
3use pyo3::prelude::*;
4use pyo3::types::{PyDict, PyMapping, PyString};
5
6use super::PyDataFrame;
7use crate::conversion::any_value::py_object_to_any_value;
8use crate::conversion::{Wrap, vec_extract_wrapped};
9use crate::error::PyPolarsErr;
10use crate::interop;
11use crate::utils::EnterPolarsExt;
12
13#[pymethods]
14impl PyDataFrame {
15    #[staticmethod]
16    #[pyo3(signature = (data, schema=None, infer_schema_length=None))]
17    pub fn from_rows(
18        py: Python<'_>,
19        data: Vec<Wrap<Row>>,
20        schema: Option<Wrap<Schema>>,
21        infer_schema_length: Option<usize>,
22    ) -> PyResult<Self> {
23        let data = vec_extract_wrapped(data);
24        let schema = schema.map(|wrap| wrap.0);
25        py.enter_polars(move || finish_from_rows(data, schema, None, infer_schema_length))
26    }
27
28    #[staticmethod]
29    #[pyo3(signature = (data, schema=None, schema_overrides=None, strict=true, infer_schema_length=None))]
30    pub fn from_dicts(
31        py: Python<'_>,
32        data: &Bound<PyAny>,
33        schema: Option<Wrap<Schema>>,
34        schema_overrides: Option<Wrap<Schema>>,
35        strict: bool,
36        infer_schema_length: Option<usize>,
37    ) -> PyResult<Self> {
38        let schema = schema.map(|wrap| wrap.0);
39        let schema_overrides = schema_overrides.map(|wrap| wrap.0);
40
41        // determine row extraction strategy from the first item:
42        // PyDict (faster), or PyMapping (more generic, slower)
43        let from_mapping = data.len()? > 0 && {
44            let mut iter = data.try_iter()?;
45            loop {
46                match iter.next() {
47                    Some(Ok(item)) if !item.is_none() => break !item.is_instance_of::<PyDict>(),
48                    Some(Err(e)) => return Err(e),
49                    Some(_) => continue,
50                    None => break false,
51                }
52            }
53        };
54
55        // read (or infer) field names, then extract row values
56        let names = get_schema_names(data, schema.as_ref(), infer_schema_length, from_mapping)?;
57        let rows = if from_mapping {
58            mappings_to_rows(data, &names, strict)?
59        } else {
60            dicts_to_rows(data, &names, strict)?
61        };
62
63        let schema = schema.or_else(|| {
64            Some(columns_names_to_empty_schema(
65                names.iter().map(String::as_str),
66            ))
67        });
68        py.enter_polars(move || {
69            finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
70        })
71    }
72
73    #[staticmethod]
74    pub fn from_arrow_record_batches(
75        py: Python<'_>,
76        rb: Vec<Bound<PyAny>>,
77        schema: Bound<PyAny>,
78    ) -> PyResult<Self> {
79        let df = interop::arrow::to_rust::to_rust_df(py, &rb, schema)?;
80        Ok(Self::from(df))
81    }
82}
83
84fn finish_from_rows(
85    rows: Vec<Row>,
86    schema: Option<Schema>,
87    schema_overrides: Option<Schema>,
88    infer_schema_length: Option<usize>,
89) -> PyResult<PyDataFrame> {
90    let schema = if let Some(mut schema) = schema {
91        resolve_schema_overrides(&mut schema, schema_overrides);
92        update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;
93        schema
94    } else {
95        rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?
96    };
97
98    let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;
99    Ok(df.into())
100}
101
102fn update_schema_from_rows(
103    schema: &mut Schema,
104    rows: &[Row],
105    infer_schema_length: Option<usize>,
106) -> PyResult<()> {
107    let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known());
108    if schema_is_complete {
109        return Ok(());
110    }
111
112    // TODO: Only infer dtypes for columns with an unknown dtype
113    let inferred_dtypes =
114        rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;
115    let inferred_dtypes_slice = inferred_dtypes.as_slice();
116
117    for (i, dtype) in schema.iter_values_mut().enumerate() {
118        if !dtype.is_known() {
119            *dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {
120                polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")
121            })
122            .map_err(PyPolarsErr::from)?
123            .clone();
124        }
125    }
126    Ok(())
127}
128
129/// Override the data type of certain schema fields.
130///
131/// Overrides for nonexistent columns are ignored.
132fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) {
133    if let Some(overrides) = schema_overrides {
134        for (name, dtype) in overrides.into_iter() {
135            schema.set_dtype(name.as_str(), dtype);
136        }
137    }
138}
139
140fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema
141where
142    I: IntoIterator<Item = &'a str>,
143{
144    let fields = column_names
145        .into_iter()
146        .map(|c| Field::new(c.into(), DataType::Unknown(Default::default())));
147    Schema::from_iter(fields)
148}
149
150fn dicts_to_rows(
151    data: &Bound<'_, PyAny>,
152    names: &[String],
153    strict: bool,
154) -> PyResult<Vec<Row<'static>>> {
155    let py = data.py();
156    let mut rows = Vec::with_capacity(data.len()?);
157    let null_row = Row::new(vec![AnyValue::Null; names.len()]);
158
159    // pre-convert keys/names so we don't repeatedly create them in the loop
160    let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();
161
162    for d in data.try_iter()? {
163        let d = d?;
164        if d.is_none() {
165            rows.push(null_row.clone())
166        } else {
167            let d = d.downcast::<PyDict>()?;
168            let mut row = Vec::with_capacity(names.len());
169            for k in &py_keys {
170                let val = match d.get_item(k)? {
171                    None => AnyValue::Null,
172                    Some(py_val) => py_object_to_any_value(&py_val.as_borrowed(), strict, true)?,
173                };
174                row.push(val)
175            }
176            rows.push(Row(row))
177        }
178    }
179    Ok(rows)
180}
181
182fn mappings_to_rows(
183    data: &Bound<'_, PyAny>,
184    names: &[String],
185    strict: bool,
186) -> PyResult<Vec<Row<'static>>> {
187    let py = data.py();
188    let mut rows = Vec::with_capacity(data.len()?);
189    let null_row = Row::new(vec![AnyValue::Null; names.len()]);
190
191    // pre-convert keys/names so we don't repeatedly create them in the loop
192    let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();
193
194    for d in data.try_iter()? {
195        let d = d?;
196        if d.is_none() {
197            rows.push(null_row.clone())
198        } else {
199            let d = d.downcast::<PyMapping>()?;
200            let mut row = Vec::with_capacity(names.len());
201            for k in &py_keys {
202                let py_val = d.get_item(k)?;
203                let val = if py_val.is_none() {
204                    AnyValue::Null
205                } else {
206                    py_object_to_any_value(&py_val, strict, true)?
207                };
208                row.push(val)
209            }
210            rows.push(Row(row))
211        }
212    }
213    Ok(rows)
214}
215
216/// Either read the given schema, or infer the schema names from the data.
217fn get_schema_names(
218    data: &Bound<PyAny>,
219    schema: Option<&Schema>,
220    infer_schema_length: Option<usize>,
221    from_mapping: bool,
222) -> PyResult<Vec<String>> {
223    if let Some(schema) = schema {
224        Ok(schema.iter_names().map(|n| n.to_string()).collect())
225    } else {
226        let data_len = data.len()?;
227        let infer_schema_length = infer_schema_length
228            .map(|n| std::cmp::max(1, n))
229            .unwrap_or(data_len);
230
231        if from_mapping {
232            infer_schema_names_from_mapping_data(data, infer_schema_length)
233        } else {
234            infer_schema_names_from_dict_data(data, infer_schema_length)
235        }
236    }
237}
238
239/// Infer schema names from an iterable of dictionaries.
240///
241/// The resulting schema order is determined by the order
242/// in which the names are encountered in the data.
243fn infer_schema_names_from_dict_data(
244    data: &Bound<PyAny>,
245    infer_schema_length: usize,
246) -> PyResult<Vec<String>> {
247    let mut names = PlIndexSet::new();
248    for d in data.try_iter()?.take(infer_schema_length) {
249        let d = d?;
250        if !d.is_none() {
251            let d = d.downcast::<PyDict>()?;
252            let keys = d.keys().iter();
253            for name in keys {
254                let name = name.extract::<String>()?;
255                names.insert(name);
256            }
257        }
258    }
259    Ok(names.into_iter().collect())
260}
261
262/// Infer schema names from an iterable of mapping objects.
263///
264/// The resulting schema order is determined by the order
265/// in which the names are encountered in the data.
266fn infer_schema_names_from_mapping_data(
267    data: &Bound<PyAny>,
268    infer_schema_length: usize,
269) -> PyResult<Vec<String>> {
270    let mut names = PlIndexSet::new();
271    for d in data.try_iter()?.take(infer_schema_length) {
272        let d = d?;
273        if !d.is_none() {
274            let d = d.downcast::<PyMapping>()?;
275            let keys = d.keys()?;
276            for name in keys {
277                let name = name.extract::<String>()?;
278                names.insert(name);
279            }
280        }
281    }
282    Ok(names.into_iter().collect())
283}