polars_python/dataframe/
construction.rs1use polars::frame::row::{rows_to_schema_supertypes, rows_to_supertypes, Row};
2use polars::prelude::*;
3use pyo3::prelude::*;
4use pyo3::types::PyDict;
5
6use super::PyDataFrame;
7use crate::conversion::any_value::py_object_to_any_value;
8use crate::conversion::{vec_extract_wrapped, Wrap};
9use crate::error::PyPolarsErr;
10use crate::interop;
11
12#[pymethods]
13impl PyDataFrame {
14 #[staticmethod]
15 #[pyo3(signature = (data, schema=None, infer_schema_length=None))]
16 pub fn from_rows(
17 py: Python,
18 data: Vec<Wrap<Row>>,
19 schema: Option<Wrap<Schema>>,
20 infer_schema_length: Option<usize>,
21 ) -> PyResult<Self> {
22 let data = vec_extract_wrapped(data);
23 let schema = schema.map(|wrap| wrap.0);
24 py.allow_threads(move || finish_from_rows(data, schema, None, infer_schema_length))
25 }
26
27 #[staticmethod]
28 #[pyo3(signature = (data, schema=None, schema_overrides=None, strict=true, infer_schema_length=None))]
29 pub fn from_dicts(
30 py: Python,
31 data: &Bound<PyAny>,
32 schema: Option<Wrap<Schema>>,
33 schema_overrides: Option<Wrap<Schema>>,
34 strict: bool,
35 infer_schema_length: Option<usize>,
36 ) -> PyResult<Self> {
37 let schema = schema.map(|wrap| wrap.0);
38 let schema_overrides = schema_overrides.map(|wrap| wrap.0);
39
40 let names = get_schema_names(data, schema.as_ref(), infer_schema_length)?;
41 let rows = dicts_to_rows(data, &names, strict)?;
42
43 let schema = schema.or_else(|| {
44 Some(columns_names_to_empty_schema(
45 names.iter().map(String::as_str),
46 ))
47 });
48
49 py.allow_threads(move || {
50 finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
51 })
52 }
53
54 #[staticmethod]
55 pub fn from_arrow_record_batches(
56 py: Python,
57 rb: Vec<Bound<PyAny>>,
58 schema: Bound<PyAny>,
59 ) -> PyResult<Self> {
60 let df = interop::arrow::to_rust::to_rust_df(py, &rb, schema)?;
61 Ok(Self::from(df))
62 }
63}
64
65fn finish_from_rows(
66 rows: Vec<Row>,
67 schema: Option<Schema>,
68 schema_overrides: Option<Schema>,
69 infer_schema_length: Option<usize>,
70) -> PyResult<PyDataFrame> {
71 let mut schema = if let Some(mut schema) = schema {
72 resolve_schema_overrides(&mut schema, schema_overrides);
73 update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;
74 schema
75 } else {
76 rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?
77 };
78
79 erase_decimal_precision_scale(&mut schema);
83
84 let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;
85 Ok(df.into())
86}
87
88fn update_schema_from_rows(
89 schema: &mut Schema,
90 rows: &[Row],
91 infer_schema_length: Option<usize>,
92) -> PyResult<()> {
93 let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known());
94 if schema_is_complete {
95 return Ok(());
96 }
97
98 let inferred_dtypes =
100 rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;
101 let inferred_dtypes_slice = inferred_dtypes.as_slice();
102
103 for (i, dtype) in schema.iter_values_mut().enumerate() {
104 if !dtype.is_known() {
105 *dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {
106 polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")
107 })
108 .map_err(PyPolarsErr::from)?
109 .clone();
110 }
111 }
112 Ok(())
113}
114
115fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) {
119 if let Some(overrides) = schema_overrides {
120 for (name, dtype) in overrides.into_iter() {
121 schema.set_dtype(name.as_str(), dtype);
122 }
123 }
124}
125
126fn erase_decimal_precision_scale(schema: &mut Schema) {
128 for dtype in schema.iter_values_mut() {
129 if let DataType::Decimal(_, _) = dtype {
130 *dtype = DataType::Decimal(None, None)
131 }
132 }
133}
134
135fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema
136where
137 I: IntoIterator<Item = &'a str>,
138{
139 let fields = column_names
140 .into_iter()
141 .map(|c| Field::new(c.into(), DataType::Unknown(Default::default())));
142 Schema::from_iter(fields)
143}
144
145fn dicts_to_rows<'a>(
146 data: &Bound<'a, PyAny>,
147 names: &'a [String],
148 strict: bool,
149) -> PyResult<Vec<Row<'a>>> {
150 let len = data.len()?;
151 let mut rows = Vec::with_capacity(len);
152 for d in data.try_iter()? {
153 let d = d?;
154 let d = d.downcast::<PyDict>()?;
155
156 let mut row = Vec::with_capacity(names.len());
157 for k in names.iter() {
158 let val = match d.get_item(k)? {
159 None => AnyValue::Null,
160 Some(val) => py_object_to_any_value(&val.as_borrowed(), strict, true)?,
161 };
162 row.push(val)
163 }
164 rows.push(Row(row))
165 }
166 Ok(rows)
167}
168
169fn get_schema_names(
171 data: &Bound<PyAny>,
172 schema: Option<&Schema>,
173 infer_schema_length: Option<usize>,
174) -> PyResult<Vec<String>> {
175 if let Some(schema) = schema {
176 Ok(schema.iter_names().map(|n| n.to_string()).collect())
177 } else {
178 infer_schema_names_from_data(data, infer_schema_length)
179 }
180}
181
182fn infer_schema_names_from_data(
187 data: &Bound<PyAny>,
188 infer_schema_length: Option<usize>,
189) -> PyResult<Vec<String>> {
190 let data_len = data.len()?;
191 let infer_schema_length = infer_schema_length
192 .map(|n| std::cmp::max(1, n))
193 .unwrap_or(data_len);
194
195 let mut names = PlIndexSet::new();
196 for d in data.try_iter()?.take(infer_schema_length) {
197 let d = d?;
198 let d = d.downcast::<PyDict>()?;
199 let keys = d.keys();
200 for name in keys {
201 let name = name.extract::<String>()?;
202 names.insert(name);
203 }
204 }
205 Ok(names.into_iter().collect())
206}