polars_python/dataframe/
construction.rs

1use polars::frame::row::{Row, rows_to_schema_supertypes, rows_to_supertypes};
2use polars::prelude::*;
3use pyo3::prelude::*;
4use pyo3::types::{PyDict, PyMapping, PyString};
5
6use super::PyDataFrame;
7use crate::conversion::any_value::py_object_to_any_value;
8use crate::conversion::{Wrap, vec_extract_wrapped};
9use crate::error::PyPolarsErr;
10use crate::interop;
11use crate::utils::EnterPolarsExt;
12
13#[pymethods]
14impl PyDataFrame {
15    #[staticmethod]
16    #[pyo3(signature = (data, schema=None, infer_schema_length=None))]
17    pub fn from_rows(
18        py: Python<'_>,
19        data: Vec<Wrap<Row>>,
20        schema: Option<Wrap<Schema>>,
21        infer_schema_length: Option<usize>,
22    ) -> PyResult<Self> {
23        let data = vec_extract_wrapped(data);
24        let schema = schema.map(|wrap| wrap.0);
25        py.enter_polars(move || finish_from_rows(data, schema, None, infer_schema_length))
26    }
27
28    #[staticmethod]
29    #[pyo3(signature = (data, schema=None, schema_overrides=None, strict=true, infer_schema_length=None))]
30    pub fn from_dicts(
31        py: Python<'_>,
32        data: &Bound<PyAny>,
33        schema: Option<Wrap<Schema>>,
34        schema_overrides: Option<Wrap<Schema>>,
35        strict: bool,
36        infer_schema_length: Option<usize>,
37    ) -> PyResult<Self> {
38        let schema = schema.map(|wrap| wrap.0);
39        let schema_overrides = schema_overrides.map(|wrap| wrap.0);
40
41        // determine row extraction strategy from the first item:
42        // PyDict (faster), or PyMapping (more generic, slower)
43        let from_mapping = data.len()? > 0 && {
44            let mut iter = data.try_iter()?;
45            loop {
46                match iter.next() {
47                    Some(Ok(item)) if !item.is_none() => break !item.is_instance_of::<PyDict>(),
48                    Some(Err(e)) => return Err(e),
49                    Some(_) => continue,
50                    None => break false,
51                }
52            }
53        };
54
55        // read (or infer) field names, then extract row values
56        let names = get_schema_names(data, schema.as_ref(), infer_schema_length, from_mapping)?;
57        let rows = if from_mapping {
58            mappings_to_rows(data, &names, strict)?
59        } else {
60            dicts_to_rows(data, &names, strict)?
61        };
62
63        let schema = schema.or_else(|| {
64            Some(columns_names_to_empty_schema(
65                names.iter().map(String::as_str),
66            ))
67        });
68        py.enter_polars(move || {
69            finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
70        })
71    }
72
73    #[staticmethod]
74    pub fn from_arrow_record_batches(
75        py: Python<'_>,
76        rb: Vec<Bound<PyAny>>,
77        schema: Bound<PyAny>,
78    ) -> PyResult<Self> {
79        let df = interop::arrow::to_rust::to_rust_df(py, &rb, schema)?;
80        Ok(Self::from(df))
81    }
82}
83
84fn finish_from_rows(
85    rows: Vec<Row>,
86    schema: Option<Schema>,
87    schema_overrides: Option<Schema>,
88    infer_schema_length: Option<usize>,
89) -> PyResult<PyDataFrame> {
90    let mut schema = if let Some(mut schema) = schema {
91        resolve_schema_overrides(&mut schema, schema_overrides);
92        update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;
93        schema
94    } else {
95        rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?
96    };
97
98    // TODO: Remove this step when Decimals are supported properly.
99    // Erasing the decimal precision/scale here will just require us to infer it again later.
100    // https://github.com/pola-rs/polars/issues/14427
101    erase_decimal_precision_scale(&mut schema);
102
103    let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;
104    Ok(df.into())
105}
106
107fn update_schema_from_rows(
108    schema: &mut Schema,
109    rows: &[Row],
110    infer_schema_length: Option<usize>,
111) -> PyResult<()> {
112    let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known());
113    if schema_is_complete {
114        return Ok(());
115    }
116
117    // TODO: Only infer dtypes for columns with an unknown dtype
118    let inferred_dtypes =
119        rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;
120    let inferred_dtypes_slice = inferred_dtypes.as_slice();
121
122    for (i, dtype) in schema.iter_values_mut().enumerate() {
123        if !dtype.is_known() {
124            *dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {
125                polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")
126            })
127            .map_err(PyPolarsErr::from)?
128            .clone();
129        }
130    }
131    Ok(())
132}
133
134/// Override the data type of certain schema fields.
135///
136/// Overrides for nonexistent columns are ignored.
137fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) {
138    if let Some(overrides) = schema_overrides {
139        for (name, dtype) in overrides.into_iter() {
140            schema.set_dtype(name.as_str(), dtype);
141        }
142    }
143}
144
145/// Erase precision/scale information from Decimal types.
146fn erase_decimal_precision_scale(schema: &mut Schema) {
147    for dtype in schema.iter_values_mut() {
148        if let DataType::Decimal(_, _) = dtype {
149            *dtype = DataType::Decimal(None, None)
150        }
151    }
152}
153
154fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema
155where
156    I: IntoIterator<Item = &'a str>,
157{
158    let fields = column_names
159        .into_iter()
160        .map(|c| Field::new(c.into(), DataType::Unknown(Default::default())));
161    Schema::from_iter(fields)
162}
163
164fn dicts_to_rows(
165    data: &Bound<'_, PyAny>,
166    names: &[String],
167    strict: bool,
168) -> PyResult<Vec<Row<'static>>> {
169    let py = data.py();
170    let mut rows = Vec::with_capacity(data.len()?);
171    let null_row = Row::new(vec![AnyValue::Null; names.len()]);
172
173    // pre-convert keys/names so we don't repeatedly create them in the loop
174    let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();
175
176    for d in data.try_iter()? {
177        let d = d?;
178        if d.is_none() {
179            rows.push(null_row.clone())
180        } else {
181            let d = d.downcast::<PyDict>()?;
182            let mut row = Vec::with_capacity(names.len());
183            for k in &py_keys {
184                let val = match d.get_item(k)? {
185                    None => AnyValue::Null,
186                    Some(py_val) => py_object_to_any_value(&py_val.as_borrowed(), strict, true)?,
187                };
188                row.push(val)
189            }
190            rows.push(Row(row))
191        }
192    }
193    Ok(rows)
194}
195
196fn mappings_to_rows(
197    data: &Bound<'_, PyAny>,
198    names: &[String],
199    strict: bool,
200) -> PyResult<Vec<Row<'static>>> {
201    let py = data.py();
202    let mut rows = Vec::with_capacity(data.len()?);
203    let null_row = Row::new(vec![AnyValue::Null; names.len()]);
204
205    // pre-convert keys/names so we don't repeatedly create them in the loop
206    let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();
207
208    for d in data.try_iter()? {
209        let d = d?;
210        if d.is_none() {
211            rows.push(null_row.clone())
212        } else {
213            let d = d.downcast::<PyMapping>()?;
214            let mut row = Vec::with_capacity(names.len());
215            for k in &py_keys {
216                let py_val = d.get_item(k)?;
217                let val = if py_val.is_none() {
218                    AnyValue::Null
219                } else {
220                    py_object_to_any_value(&py_val, strict, true)?
221                };
222                row.push(val)
223            }
224            rows.push(Row(row))
225        }
226    }
227    Ok(rows)
228}
229
230/// Either read the given schema, or infer the schema names from the data.
231fn get_schema_names(
232    data: &Bound<PyAny>,
233    schema: Option<&Schema>,
234    infer_schema_length: Option<usize>,
235    from_mapping: bool,
236) -> PyResult<Vec<String>> {
237    if let Some(schema) = schema {
238        Ok(schema.iter_names().map(|n| n.to_string()).collect())
239    } else {
240        let data_len = data.len()?;
241        let infer_schema_length = infer_schema_length
242            .map(|n| std::cmp::max(1, n))
243            .unwrap_or(data_len);
244
245        if from_mapping {
246            infer_schema_names_from_mapping_data(data, infer_schema_length)
247        } else {
248            infer_schema_names_from_dict_data(data, infer_schema_length)
249        }
250    }
251}
252
253/// Infer schema names from an iterable of dictionaries.
254///
255/// The resulting schema order is determined by the order
256/// in which the names are encountered in the data.
257fn infer_schema_names_from_dict_data(
258    data: &Bound<PyAny>,
259    infer_schema_length: usize,
260) -> PyResult<Vec<String>> {
261    let mut names = PlIndexSet::new();
262    for d in data.try_iter()?.take(infer_schema_length) {
263        let d = d?;
264        if !d.is_none() {
265            let d = d.downcast::<PyDict>()?;
266            let keys = d.keys().iter();
267            for name in keys {
268                let name = name.extract::<String>()?;
269                names.insert(name);
270            }
271        }
272    }
273    Ok(names.into_iter().collect())
274}
275
276/// Infer schema names from an iterable of mapping objects.
277///
278/// The resulting schema order is determined by the order
279/// in which the names are encountered in the data.
280fn infer_schema_names_from_mapping_data(
281    data: &Bound<PyAny>,
282    infer_schema_length: usize,
283) -> PyResult<Vec<String>> {
284    let mut names = PlIndexSet::new();
285    for d in data.try_iter()?.take(infer_schema_length) {
286        let d = d?;
287        if !d.is_none() {
288            let d = d.downcast::<PyMapping>()?;
289            let keys = d.keys()?;
290            for name in keys {
291                let name = name.extract::<String>()?;
292                names.insert(name);
293            }
294        }
295    }
296    Ok(names.into_iter().collect())
297}