polars_python/dataframe/
construction.rs

1use polars::frame::row::{rows_to_schema_supertypes, rows_to_supertypes, Row};
2use polars::prelude::*;
3use pyo3::prelude::*;
4use pyo3::types::PyDict;
5
6use super::PyDataFrame;
7use crate::conversion::any_value::py_object_to_any_value;
8use crate::conversion::{vec_extract_wrapped, Wrap};
9use crate::error::PyPolarsErr;
10use crate::interop;
11
12#[pymethods]
13impl PyDataFrame {
14    #[staticmethod]
15    #[pyo3(signature = (data, schema=None, infer_schema_length=None))]
16    pub fn from_rows(
17        py: Python,
18        data: Vec<Wrap<Row>>,
19        schema: Option<Wrap<Schema>>,
20        infer_schema_length: Option<usize>,
21    ) -> PyResult<Self> {
22        let data = vec_extract_wrapped(data);
23        let schema = schema.map(|wrap| wrap.0);
24        py.allow_threads(move || finish_from_rows(data, schema, None, infer_schema_length))
25    }
26
27    #[staticmethod]
28    #[pyo3(signature = (data, schema=None, schema_overrides=None, strict=true, infer_schema_length=None))]
29    pub fn from_dicts(
30        py: Python,
31        data: &Bound<PyAny>,
32        schema: Option<Wrap<Schema>>,
33        schema_overrides: Option<Wrap<Schema>>,
34        strict: bool,
35        infer_schema_length: Option<usize>,
36    ) -> PyResult<Self> {
37        let schema = schema.map(|wrap| wrap.0);
38        let schema_overrides = schema_overrides.map(|wrap| wrap.0);
39
40        let names = get_schema_names(data, schema.as_ref(), infer_schema_length)?;
41        let rows = dicts_to_rows(data, &names, strict)?;
42
43        let schema = schema.or_else(|| {
44            Some(columns_names_to_empty_schema(
45                names.iter().map(String::as_str),
46            ))
47        });
48
49        py.allow_threads(move || {
50            finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
51        })
52    }
53
54    #[staticmethod]
55    pub fn from_arrow_record_batches(
56        py: Python,
57        rb: Vec<Bound<PyAny>>,
58        schema: Bound<PyAny>,
59    ) -> PyResult<Self> {
60        let df = interop::arrow::to_rust::to_rust_df(py, &rb, schema)?;
61        Ok(Self::from(df))
62    }
63}
64
65fn finish_from_rows(
66    rows: Vec<Row>,
67    schema: Option<Schema>,
68    schema_overrides: Option<Schema>,
69    infer_schema_length: Option<usize>,
70) -> PyResult<PyDataFrame> {
71    let mut schema = if let Some(mut schema) = schema {
72        resolve_schema_overrides(&mut schema, schema_overrides);
73        update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;
74        schema
75    } else {
76        rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?
77    };
78
79    // TODO: Remove this step when Decimals are supported properly.
80    // Erasing the decimal precision/scale here will just require us to infer it again later.
81    // https://github.com/pola-rs/polars/issues/14427
82    erase_decimal_precision_scale(&mut schema);
83
84    let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;
85    Ok(df.into())
86}
87
88fn update_schema_from_rows(
89    schema: &mut Schema,
90    rows: &[Row],
91    infer_schema_length: Option<usize>,
92) -> PyResult<()> {
93    let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known());
94    if schema_is_complete {
95        return Ok(());
96    }
97
98    // TODO: Only infer dtypes for columns with an unknown dtype
99    let inferred_dtypes =
100        rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;
101    let inferred_dtypes_slice = inferred_dtypes.as_slice();
102
103    for (i, dtype) in schema.iter_values_mut().enumerate() {
104        if !dtype.is_known() {
105            *dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {
106                polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")
107            })
108            .map_err(PyPolarsErr::from)?
109            .clone();
110        }
111    }
112    Ok(())
113}
114
115/// Override the data type of certain schema fields.
116///
117/// Overrides for nonexistent columns are ignored.
118fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) {
119    if let Some(overrides) = schema_overrides {
120        for (name, dtype) in overrides.into_iter() {
121            schema.set_dtype(name.as_str(), dtype);
122        }
123    }
124}
125
126/// Erase precision/scale information from Decimal types.
127fn erase_decimal_precision_scale(schema: &mut Schema) {
128    for dtype in schema.iter_values_mut() {
129        if let DataType::Decimal(_, _) = dtype {
130            *dtype = DataType::Decimal(None, None)
131        }
132    }
133}
134
135fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema
136where
137    I: IntoIterator<Item = &'a str>,
138{
139    let fields = column_names
140        .into_iter()
141        .map(|c| Field::new(c.into(), DataType::Unknown(Default::default())));
142    Schema::from_iter(fields)
143}
144
145fn dicts_to_rows<'a>(
146    data: &Bound<'a, PyAny>,
147    names: &'a [String],
148    strict: bool,
149) -> PyResult<Vec<Row<'a>>> {
150    let len = data.len()?;
151    let mut rows = Vec::with_capacity(len);
152    for d in data.try_iter()? {
153        let d = d?;
154        let d = d.downcast::<PyDict>()?;
155
156        let mut row = Vec::with_capacity(names.len());
157        for k in names.iter() {
158            let val = match d.get_item(k)? {
159                None => AnyValue::Null,
160                Some(val) => py_object_to_any_value(&val.as_borrowed(), strict, true)?,
161            };
162            row.push(val)
163        }
164        rows.push(Row(row))
165    }
166    Ok(rows)
167}
168
169/// Either read the given schema, or infer the schema names from the data.
170fn get_schema_names(
171    data: &Bound<PyAny>,
172    schema: Option<&Schema>,
173    infer_schema_length: Option<usize>,
174) -> PyResult<Vec<String>> {
175    if let Some(schema) = schema {
176        Ok(schema.iter_names().map(|n| n.to_string()).collect())
177    } else {
178        infer_schema_names_from_data(data, infer_schema_length)
179    }
180}
181
182/// Infer schema names from an iterable of dictionaries.
183///
184/// The resulting schema order is determined by the order in which the names are encountered in
185/// the data.
186fn infer_schema_names_from_data(
187    data: &Bound<PyAny>,
188    infer_schema_length: Option<usize>,
189) -> PyResult<Vec<String>> {
190    let data_len = data.len()?;
191    let infer_schema_length = infer_schema_length
192        .map(|n| std::cmp::max(1, n))
193        .unwrap_or(data_len);
194
195    let mut names = PlIndexSet::new();
196    for d in data.try_iter()?.take(infer_schema_length) {
197        let d = d?;
198        let d = d.downcast::<PyDict>()?;
199        let keys = d.keys();
200        for name in keys {
201            let name = name.extract::<String>()?;
202            names.insert(name);
203        }
204    }
205    Ok(names.into_iter().collect())
206}