polars_python/map/
mod.rs

1pub mod dataframe;
2pub mod lazy;
3pub mod series;
4
5use std::collections::BTreeMap;
6
7use arrow::bitmap::BitmapBuilder;
8use polars::chunked_array::builder::get_list_builder;
9use polars::prelude::*;
10use polars_core::POOL;
11use polars_core::utils::CustomIterTools;
12use polars_utils::pl_str::PlSmallStr;
13use pyo3::exceptions::PyValueError;
14use pyo3::prelude::*;
15use pyo3::pybacked::PyBackedStr;
16use pyo3::types::PyDict;
17use rayon::prelude::*;
18
19use crate::error::PyPolarsErr;
20use crate::prelude::ObjectValue;
21use crate::utils::EnterPolarsExt;
22use crate::{PySeries, Wrap};
23
24pub trait PyPolarsNumericType: PolarsNumericType {}
25
26impl PyPolarsNumericType for UInt8Type {}
27impl PyPolarsNumericType for UInt16Type {}
28impl PyPolarsNumericType for UInt32Type {}
29impl PyPolarsNumericType for UInt64Type {}
30impl PyPolarsNumericType for UInt128Type {}
31impl PyPolarsNumericType for Int8Type {}
32impl PyPolarsNumericType for Int16Type {}
33impl PyPolarsNumericType for Int32Type {}
34impl PyPolarsNumericType for Int64Type {}
35impl PyPolarsNumericType for Int128Type {}
36impl PyPolarsNumericType for Float32Type {}
37impl PyPolarsNumericType for Float64Type {}
38
39pub(super) fn check_nested_object(dt: &DataType) -> PyResult<()> {
40    if dt.contains_objects() {
41        Err(PyValueError::new_err(
42            "nested objects are not allowed\n\nSet `return_dtype=polars.Object` to use Python's native nesting.",
43        ))
44    } else {
45        Ok(())
46    }
47}
48
49fn iterator_to_struct<'py>(
50    py: Python<'py>,
51    it: impl Iterator<Item = PyResult<Option<Bound<'py, PyAny>>>>,
52    init_null_count: usize,
53    first_value: AnyValue<'static>,
54    name: PlSmallStr,
55    capacity: usize,
56) -> PyResult<PySeries> {
57    let (vals, flds) = match &first_value {
58        av @ AnyValue::Struct(_, _, flds) => (av._iter_struct_av().collect::<Vec<_>>(), &**flds),
59        AnyValue::StructOwned(payload) => (payload.0.clone(), &*payload.1),
60        _ => {
61            return Err(crate::exceptions::ComputeError::new_err(format!(
62                "expected struct got {first_value:?}",
63            )));
64        },
65    };
66
67    // Every item in the struct is kept as its own buffer of AnyValues.
68    // So a struct with 2 items: {a, b} will have:
69    // [
70    //      [ a values ]
71    //      [ b values ]
72    // ]
73    let mut struct_fields: BTreeMap<PlSmallStr, Vec<AnyValue>> = BTreeMap::new();
74
75    // As a BTreeMap sorts its keys, we also need to track the original
76    // order of the field names.
77    let mut field_names_ordered: Vec<PlSmallStr> = Vec::with_capacity(flds.len());
78
79    // Use the first value and the known null count to initialize the buffers
80    // if we find a new key later on, we make a new entry in the BTree.
81    for (value, fld) in vals.into_iter().zip(flds) {
82        let mut buf = Vec::with_capacity(capacity);
83        buf.extend((0..init_null_count).map(|_| AnyValue::Null));
84        buf.push(value);
85        field_names_ordered.push(fld.name().clone());
86        struct_fields.insert(fld.name().clone(), buf);
87    }
88
89    let mut validity = BitmapBuilder::with_capacity(capacity);
90    validity.extend_constant(init_null_count, false);
91    validity.push(true);
92
93    for dict in it {
94        match dict? {
95            None => {
96                validity.push(false);
97                for field_items in struct_fields.values_mut() {
98                    field_items.push(AnyValue::Null);
99                }
100            },
101            Some(dict) => {
102                validity.push(true);
103                let dict = dict.downcast::<PyDict>()?;
104                let current_len = struct_fields
105                    .values()
106                    .next()
107                    .map(|buf| buf.len())
108                    .unwrap_or(0);
109
110                // We ignore the keys of the rest of the dicts,
111                // the first item determines the output name.
112                for (key, val) in dict.iter() {
113                    let key = key.str().unwrap().extract::<PyBackedStr>().unwrap();
114                    let item = val.extract::<Wrap<AnyValue>>()?;
115                    if let Some(buf) = struct_fields.get_mut(&*key) {
116                        buf.push(item.0);
117                    } else {
118                        let mut buf = Vec::with_capacity(capacity);
119                        buf.extend((0..init_null_count + current_len).map(|_| AnyValue::Null));
120                        buf.push(item.0);
121                        let key: PlSmallStr = (&*key).into();
122                        field_names_ordered.push(key.clone());
123                        struct_fields.insert(key, buf);
124                    };
125                }
126
127                // Add nulls to keys that were not in the dict.
128                if dict.len() < struct_fields.len() {
129                    let current_len = current_len + 1;
130                    for buf in struct_fields.values_mut() {
131                        if buf.len() < current_len {
132                            buf.push(AnyValue::Null)
133                        }
134                    }
135                }
136            },
137        }
138    }
139
140    let fields = py.enter_polars_ok(|| {
141        POOL.install(|| {
142            field_names_ordered
143                .par_iter()
144                .map(|name| Series::new(name.clone(), struct_fields.get(name).unwrap()))
145                .collect::<Vec<_>>()
146        })
147    })?;
148
149    Ok(
150        StructChunked::from_series(name, fields[0].len(), fields.iter())
151            .unwrap()
152            .with_outer_validity(validity.into_opt_validity())
153            .into_series()
154            .into(),
155    )
156}
157
158fn iterator_to_primitive<T>(
159    it: impl Iterator<Item = PyResult<Option<T::Native>>>,
160    init_null_count: usize,
161    first_value: Option<T::Native>,
162    name: PlSmallStr,
163    capacity: usize,
164) -> PyResult<ChunkedArray<T>>
165where
166    T: PyPolarsNumericType,
167{
168    let mut error = None;
169    // SAFETY: we know the iterators len.
170    let ca: ChunkedArray<T> = unsafe {
171        if init_null_count > 0 {
172            (0..init_null_count)
173                .map(|_| Ok(None))
174                .chain(std::iter::once(Ok(first_value)))
175                .chain(it)
176                .trust_my_length(capacity)
177                .map(|v| catch_err(&mut error, v))
178                .collect_trusted()
179        } else if first_value.is_some() {
180            std::iter::once(Ok(first_value))
181                .chain(it)
182                .trust_my_length(capacity)
183                .map(|v| catch_err(&mut error, v))
184                .collect_trusted()
185        } else {
186            it.map(|v| catch_err(&mut error, v)).collect()
187        }
188    };
189    debug_assert_eq!(ca.len(), capacity);
190
191    if let Some(err) = error {
192        let _ = err?;
193    }
194    Ok(ca.with_name(name))
195}
196
197fn iterator_to_bool(
198    it: impl Iterator<Item = PyResult<Option<bool>>>,
199    init_null_count: usize,
200    first_value: Option<bool>,
201    name: PlSmallStr,
202    capacity: usize,
203) -> PyResult<ChunkedArray<BooleanType>> {
204    let mut error = None;
205    // SAFETY: we know the iterators len.
206    let ca: BooleanChunked = unsafe {
207        if init_null_count > 0 {
208            (0..init_null_count)
209                .map(|_| Ok(None))
210                .chain(std::iter::once(Ok(first_value)))
211                .chain(it)
212                .trust_my_length(capacity)
213                .map(|v| catch_err(&mut error, v))
214                .collect_trusted()
215        } else if first_value.is_some() {
216            std::iter::once(Ok(first_value))
217                .chain(it)
218                .trust_my_length(capacity)
219                .map(|v| catch_err(&mut error, v))
220                .collect_trusted()
221        } else {
222            it.map(|v| catch_err(&mut error, v)).collect()
223        }
224    };
225    if let Some(err) = error {
226        let _ = err?;
227    }
228    debug_assert_eq!(ca.len(), capacity);
229    Ok(ca.with_name(name))
230}
231
232#[cfg(feature = "object")]
233fn iterator_to_object(
234    it: impl Iterator<Item = PyResult<Option<ObjectValue>>>,
235    init_null_count: usize,
236    first_value: Option<ObjectValue>,
237    name: PlSmallStr,
238    capacity: usize,
239) -> PyResult<ObjectChunked<ObjectValue>> {
240    let mut error = None;
241    // SAFETY: we know the iterators len.
242    let ca: ObjectChunked<ObjectValue> = unsafe {
243        if init_null_count > 0 {
244            (0..init_null_count)
245                .map(|_| Ok(None))
246                .chain(std::iter::once(Ok(first_value)))
247                .chain(it)
248                .map(|v| catch_err(&mut error, v))
249                .trust_my_length(capacity)
250                .collect_trusted()
251        } else if first_value.is_some() {
252            std::iter::once(Ok(first_value))
253                .chain(it)
254                .map(|v| catch_err(&mut error, v))
255                .trust_my_length(capacity)
256                .collect_trusted()
257        } else {
258            it.map(|v| catch_err(&mut error, v)).collect()
259        }
260    };
261    if let Some(err) = error {
262        let _ = err?;
263    }
264    debug_assert_eq!(ca.len(), capacity);
265    Ok(ca.with_name(name))
266}
267
268fn catch_err<K>(error: &mut Option<PyResult<Option<K>>>, result: PyResult<Option<K>>) -> Option<K> {
269    match result {
270        Ok(item) => item,
271        err => {
272            if error.is_none() {
273                *error = Some(err);
274            }
275            None
276        },
277    }
278}
279
280fn iterator_to_string<S: AsRef<str>>(
281    it: impl Iterator<Item = PyResult<Option<S>>>,
282    init_null_count: usize,
283    first_value: Option<S>,
284    name: PlSmallStr,
285    capacity: usize,
286) -> PyResult<StringChunked> {
287    let mut error = None;
288    // SAFETY: we know the iterators len.
289    let ca: StringChunked = unsafe {
290        if init_null_count > 0 {
291            (0..init_null_count)
292                .map(|_| Ok(None))
293                .chain(std::iter::once(Ok(first_value)))
294                .trust_my_length(capacity)
295                .map(|v| catch_err(&mut error, v))
296                .collect_trusted()
297        } else if first_value.is_some() {
298            std::iter::once(Ok(first_value))
299                .chain(it)
300                .trust_my_length(capacity)
301                .map(|v| catch_err(&mut error, v))
302                .collect_trusted()
303        } else {
304            it.map(|v| catch_err(&mut error, v)).collect()
305        }
306    };
307    debug_assert_eq!(ca.len(), capacity);
308    if let Some(err) = error {
309        let _ = err?;
310    }
311    Ok(ca.with_name(name))
312}
313
314fn iterator_to_list(
315    dt: &DataType,
316    it: impl Iterator<Item = PyResult<Option<Series>>>,
317    init_null_count: usize,
318    first_value: Option<&Series>,
319    name: PlSmallStr,
320    capacity: usize,
321) -> PyResult<ListChunked> {
322    let mut builder = get_list_builder(dt, capacity * 5, capacity, name);
323    for _ in 0..init_null_count {
324        builder.append_null()
325    }
326    if first_value.is_some() {
327        builder
328            .append_opt_series(first_value)
329            .map_err(PyPolarsErr::from)?;
330    }
331    for opt_val in it {
332        match opt_val? {
333            None => builder.append_null(),
334            Some(s) => {
335                if s.is_empty() && s.dtype() != dt {
336                    builder
337                        .append_series(&Series::full_null(PlSmallStr::EMPTY, 0, dt))
338                        .unwrap()
339                } else {
340                    builder.append_series(&s).map_err(PyPolarsErr::from)?
341                }
342            },
343        }
344    }
345    Ok(builder.finish())
346}