polars_python/dataframe/
construction.rs

1use polars::frame::row::{Row, rows_to_schema_supertypes, rows_to_supertypes};
2use polars::prelude::*;
3use pyo3::prelude::*;
4use pyo3::types::PyDict;
5
6use super::PyDataFrame;
7use crate::conversion::any_value::py_object_to_any_value;
8use crate::conversion::{Wrap, vec_extract_wrapped};
9use crate::error::PyPolarsErr;
10use crate::interop;
11use crate::utils::EnterPolarsExt;
12
13#[pymethods]
14impl PyDataFrame {
15    #[staticmethod]
16    #[pyo3(signature = (data, schema=None, infer_schema_length=None))]
17    pub fn from_rows(
18        py: Python,
19        data: Vec<Wrap<Row>>,
20        schema: Option<Wrap<Schema>>,
21        infer_schema_length: Option<usize>,
22    ) -> PyResult<Self> {
23        let data = vec_extract_wrapped(data);
24        let schema = schema.map(|wrap| wrap.0);
25        py.enter_polars(move || finish_from_rows(data, schema, None, infer_schema_length))
26    }
27
28    #[staticmethod]
29    #[pyo3(signature = (data, schema=None, schema_overrides=None, strict=true, infer_schema_length=None))]
30    pub fn from_dicts(
31        py: Python,
32        data: &Bound<PyAny>,
33        schema: Option<Wrap<Schema>>,
34        schema_overrides: Option<Wrap<Schema>>,
35        strict: bool,
36        infer_schema_length: Option<usize>,
37    ) -> PyResult<Self> {
38        let schema = schema.map(|wrap| wrap.0);
39        let schema_overrides = schema_overrides.map(|wrap| wrap.0);
40
41        let names = get_schema_names(data, schema.as_ref(), infer_schema_length)?;
42        let rows = dicts_to_rows(data, &names, strict)?;
43
44        let schema = schema.or_else(|| {
45            Some(columns_names_to_empty_schema(
46                names.iter().map(String::as_str),
47            ))
48        });
49
50        py.enter_polars(move || {
51            finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
52        })
53    }
54
55    #[staticmethod]
56    pub fn from_arrow_record_batches(
57        py: Python,
58        rb: Vec<Bound<PyAny>>,
59        schema: Bound<PyAny>,
60    ) -> PyResult<Self> {
61        let df = interop::arrow::to_rust::to_rust_df(py, &rb, schema)?;
62        Ok(Self::from(df))
63    }
64}
65
66fn finish_from_rows(
67    rows: Vec<Row>,
68    schema: Option<Schema>,
69    schema_overrides: Option<Schema>,
70    infer_schema_length: Option<usize>,
71) -> PyResult<PyDataFrame> {
72    let mut schema = if let Some(mut schema) = schema {
73        resolve_schema_overrides(&mut schema, schema_overrides);
74        update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;
75        schema
76    } else {
77        rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?
78    };
79
80    // TODO: Remove this step when Decimals are supported properly.
81    // Erasing the decimal precision/scale here will just require us to infer it again later.
82    // https://github.com/pola-rs/polars/issues/14427
83    erase_decimal_precision_scale(&mut schema);
84
85    let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;
86    Ok(df.into())
87}
88
89fn update_schema_from_rows(
90    schema: &mut Schema,
91    rows: &[Row],
92    infer_schema_length: Option<usize>,
93) -> PyResult<()> {
94    let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known());
95    if schema_is_complete {
96        return Ok(());
97    }
98
99    // TODO: Only infer dtypes for columns with an unknown dtype
100    let inferred_dtypes =
101        rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;
102    let inferred_dtypes_slice = inferred_dtypes.as_slice();
103
104    for (i, dtype) in schema.iter_values_mut().enumerate() {
105        if !dtype.is_known() {
106            *dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {
107                polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")
108            })
109            .map_err(PyPolarsErr::from)?
110            .clone();
111        }
112    }
113    Ok(())
114}
115
116/// Override the data type of certain schema fields.
117///
118/// Overrides for nonexistent columns are ignored.
119fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) {
120    if let Some(overrides) = schema_overrides {
121        for (name, dtype) in overrides.into_iter() {
122            schema.set_dtype(name.as_str(), dtype);
123        }
124    }
125}
126
127/// Erase precision/scale information from Decimal types.
128fn erase_decimal_precision_scale(schema: &mut Schema) {
129    for dtype in schema.iter_values_mut() {
130        if let DataType::Decimal(_, _) = dtype {
131            *dtype = DataType::Decimal(None, None)
132        }
133    }
134}
135
136fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema
137where
138    I: IntoIterator<Item = &'a str>,
139{
140    let fields = column_names
141        .into_iter()
142        .map(|c| Field::new(c.into(), DataType::Unknown(Default::default())));
143    Schema::from_iter(fields)
144}
145
146fn dicts_to_rows<'a>(
147    data: &Bound<'a, PyAny>,
148    names: &'a [String],
149    strict: bool,
150) -> PyResult<Vec<Row<'a>>> {
151    let len = data.len()?;
152    let mut rows = Vec::with_capacity(len);
153    for d in data.try_iter()? {
154        let d = d?;
155        let d = d.downcast::<PyDict>()?;
156
157        let mut row = Vec::with_capacity(names.len());
158        for k in names.iter() {
159            let val = match d.get_item(k)? {
160                None => AnyValue::Null,
161                Some(val) => py_object_to_any_value(&val.as_borrowed(), strict, true)?,
162            };
163            row.push(val)
164        }
165        rows.push(Row(row))
166    }
167    Ok(rows)
168}
169
170/// Either read the given schema, or infer the schema names from the data.
171fn get_schema_names(
172    data: &Bound<PyAny>,
173    schema: Option<&Schema>,
174    infer_schema_length: Option<usize>,
175) -> PyResult<Vec<String>> {
176    if let Some(schema) = schema {
177        Ok(schema.iter_names().map(|n| n.to_string()).collect())
178    } else {
179        infer_schema_names_from_data(data, infer_schema_length)
180    }
181}
182
183/// Infer schema names from an iterable of dictionaries.
184///
185/// The resulting schema order is determined by the order in which the names are encountered in
186/// the data.
187fn infer_schema_names_from_data(
188    data: &Bound<PyAny>,
189    infer_schema_length: Option<usize>,
190) -> PyResult<Vec<String>> {
191    let data_len = data.len()?;
192    let infer_schema_length = infer_schema_length
193        .map(|n| std::cmp::max(1, n))
194        .unwrap_or(data_len);
195
196    let mut names = PlIndexSet::new();
197    for d in data.try_iter()?.take(infer_schema_length) {
198        let d = d?;
199        let d = d.downcast::<PyDict>()?;
200        let keys = d.keys();
201        for name in keys {
202            let name = name.extract::<String>()?;
203            names.insert(name);
204        }
205    }
206    Ok(names.into_iter().collect())
207}