polars_python/map/
mod.rs

1pub mod dataframe;
2pub mod lazy;
3pub mod series;
4
5use std::collections::BTreeMap;
6
7use arrow::bitmap::BitmapBuilder;
8use polars::chunked_array::builder::get_list_builder;
9use polars::prelude::*;
10use polars_core::POOL;
11use polars_core::utils::CustomIterTools;
12use polars_utils::pl_str::PlSmallStr;
13use pyo3::exceptions::PyValueError;
14use pyo3::prelude::*;
15use pyo3::pybacked::PyBackedStr;
16use pyo3::types::PyDict;
17use rayon::prelude::*;
18
19use crate::error::PyPolarsErr;
20use crate::prelude::ObjectValue;
21use crate::utils::EnterPolarsExt;
22use crate::{PySeries, Wrap};
23
24pub trait PyPolarsNumericType: PolarsNumericType {}
25
26impl PyPolarsNumericType for UInt8Type {}
27impl PyPolarsNumericType for UInt16Type {}
28impl PyPolarsNumericType for UInt32Type {}
29impl PyPolarsNumericType for UInt64Type {}
30impl PyPolarsNumericType for Int8Type {}
31impl PyPolarsNumericType for Int16Type {}
32impl PyPolarsNumericType for Int32Type {}
33impl PyPolarsNumericType for Int64Type {}
34impl PyPolarsNumericType for Int128Type {}
35impl PyPolarsNumericType for Float32Type {}
36impl PyPolarsNumericType for Float64Type {}
37
38pub(super) fn check_nested_object(dt: &DataType) -> PyResult<()> {
39    if dt.contains_objects() {
40        Err(PyValueError::new_err(
41            "nested objects are not allowed\n\nSet `return_dtype=polars.Object` to use Python's native nesting.",
42        ))
43    } else {
44        Ok(())
45    }
46}
47
48fn iterator_to_struct<'py>(
49    py: Python<'py>,
50    it: impl Iterator<Item = PyResult<Option<Bound<'py, PyAny>>>>,
51    init_null_count: usize,
52    first_value: AnyValue<'static>,
53    name: PlSmallStr,
54    capacity: usize,
55) -> PyResult<PySeries> {
56    let (vals, flds) = match &first_value {
57        av @ AnyValue::Struct(_, _, flds) => (av._iter_struct_av().collect::<Vec<_>>(), &**flds),
58        AnyValue::StructOwned(payload) => (payload.0.clone(), &*payload.1),
59        _ => {
60            return Err(crate::exceptions::ComputeError::new_err(format!(
61                "expected struct got {first_value:?}",
62            )));
63        },
64    };
65
66    // Every item in the struct is kept as its own buffer of AnyValues.
67    // So a struct with 2 items: {a, b} will have:
68    // [
69    //      [ a values ]
70    //      [ b values ]
71    // ]
72    let mut struct_fields: BTreeMap<PlSmallStr, Vec<AnyValue>> = BTreeMap::new();
73
74    // As a BTreeMap sorts its keys, we also need to track the original
75    // order of the field names.
76    let mut field_names_ordered: Vec<PlSmallStr> = Vec::with_capacity(flds.len());
77
78    // Use the first value and the known null count to initialize the buffers
79    // if we find a new key later on, we make a new entry in the BTree.
80    for (value, fld) in vals.into_iter().zip(flds) {
81        let mut buf = Vec::with_capacity(capacity);
82        buf.extend((0..init_null_count).map(|_| AnyValue::Null));
83        buf.push(value);
84        field_names_ordered.push(fld.name().clone());
85        struct_fields.insert(fld.name().clone(), buf);
86    }
87
88    let mut validity = BitmapBuilder::with_capacity(capacity);
89    validity.extend_constant(init_null_count, false);
90    validity.push(true);
91
92    for dict in it {
93        match dict? {
94            None => {
95                validity.push(false);
96                for field_items in struct_fields.values_mut() {
97                    field_items.push(AnyValue::Null);
98                }
99            },
100            Some(dict) => {
101                validity.push(true);
102                let dict = dict.downcast::<PyDict>()?;
103                let current_len = struct_fields
104                    .values()
105                    .next()
106                    .map(|buf| buf.len())
107                    .unwrap_or(0);
108
109                // We ignore the keys of the rest of the dicts,
110                // the first item determines the output name.
111                for (key, val) in dict.iter() {
112                    let key = key.str().unwrap().extract::<PyBackedStr>().unwrap();
113                    let item = val.extract::<Wrap<AnyValue>>()?;
114                    if let Some(buf) = struct_fields.get_mut(&*key) {
115                        buf.push(item.0);
116                    } else {
117                        let mut buf = Vec::with_capacity(capacity);
118                        buf.extend((0..init_null_count + current_len).map(|_| AnyValue::Null));
119                        buf.push(item.0);
120                        let key: PlSmallStr = (&*key).into();
121                        field_names_ordered.push(key.clone());
122                        struct_fields.insert(key, buf);
123                    };
124                }
125
126                // Add nulls to keys that were not in the dict.
127                if dict.len() < struct_fields.len() {
128                    let current_len = current_len + 1;
129                    for buf in struct_fields.values_mut() {
130                        if buf.len() < current_len {
131                            buf.push(AnyValue::Null)
132                        }
133                    }
134                }
135            },
136        }
137    }
138
139    let fields = py.enter_polars_ok(|| {
140        POOL.install(|| {
141            field_names_ordered
142                .par_iter()
143                .map(|name| Series::new(name.clone(), struct_fields.get(name).unwrap()))
144                .collect::<Vec<_>>()
145        })
146    })?;
147
148    Ok(
149        StructChunked::from_series(name, fields[0].len(), fields.iter())
150            .unwrap()
151            .with_outer_validity(validity.into_opt_validity())
152            .into_series()
153            .into(),
154    )
155}
156
157fn iterator_to_primitive<T>(
158    it: impl Iterator<Item = PyResult<Option<T::Native>>>,
159    init_null_count: usize,
160    first_value: Option<T::Native>,
161    name: PlSmallStr,
162    capacity: usize,
163) -> PyResult<ChunkedArray<T>>
164where
165    T: PyPolarsNumericType,
166{
167    let mut error = None;
168    // SAFETY: we know the iterators len.
169    let ca: ChunkedArray<T> = unsafe {
170        if init_null_count > 0 {
171            (0..init_null_count)
172                .map(|_| Ok(None))
173                .chain(std::iter::once(Ok(first_value)))
174                .chain(it)
175                .trust_my_length(capacity)
176                .map(|v| catch_err(&mut error, v))
177                .collect_trusted()
178        } else if first_value.is_some() {
179            std::iter::once(Ok(first_value))
180                .chain(it)
181                .trust_my_length(capacity)
182                .map(|v| catch_err(&mut error, v))
183                .collect_trusted()
184        } else {
185            it.map(|v| catch_err(&mut error, v)).collect()
186        }
187    };
188    debug_assert_eq!(ca.len(), capacity);
189
190    if let Some(err) = error {
191        let _ = err?;
192    }
193    Ok(ca.with_name(name))
194}
195
196fn iterator_to_bool(
197    it: impl Iterator<Item = PyResult<Option<bool>>>,
198    init_null_count: usize,
199    first_value: Option<bool>,
200    name: PlSmallStr,
201    capacity: usize,
202) -> PyResult<ChunkedArray<BooleanType>> {
203    let mut error = None;
204    // SAFETY: we know the iterators len.
205    let ca: BooleanChunked = unsafe {
206        if init_null_count > 0 {
207            (0..init_null_count)
208                .map(|_| Ok(None))
209                .chain(std::iter::once(Ok(first_value)))
210                .chain(it)
211                .trust_my_length(capacity)
212                .map(|v| catch_err(&mut error, v))
213                .collect_trusted()
214        } else if first_value.is_some() {
215            std::iter::once(Ok(first_value))
216                .chain(it)
217                .trust_my_length(capacity)
218                .map(|v| catch_err(&mut error, v))
219                .collect_trusted()
220        } else {
221            it.map(|v| catch_err(&mut error, v)).collect()
222        }
223    };
224    if let Some(err) = error {
225        let _ = err?;
226    }
227    debug_assert_eq!(ca.len(), capacity);
228    Ok(ca.with_name(name))
229}
230
231#[cfg(feature = "object")]
232fn iterator_to_object(
233    it: impl Iterator<Item = PyResult<Option<ObjectValue>>>,
234    init_null_count: usize,
235    first_value: Option<ObjectValue>,
236    name: PlSmallStr,
237    capacity: usize,
238) -> PyResult<ObjectChunked<ObjectValue>> {
239    let mut error = None;
240    // SAFETY: we know the iterators len.
241    let ca: ObjectChunked<ObjectValue> = unsafe {
242        if init_null_count > 0 {
243            (0..init_null_count)
244                .map(|_| Ok(None))
245                .chain(std::iter::once(Ok(first_value)))
246                .chain(it)
247                .map(|v| catch_err(&mut error, v))
248                .trust_my_length(capacity)
249                .collect_trusted()
250        } else if first_value.is_some() {
251            std::iter::once(Ok(first_value))
252                .chain(it)
253                .map(|v| catch_err(&mut error, v))
254                .trust_my_length(capacity)
255                .collect_trusted()
256        } else {
257            it.map(|v| catch_err(&mut error, v)).collect()
258        }
259    };
260    if let Some(err) = error {
261        let _ = err?;
262    }
263    debug_assert_eq!(ca.len(), capacity);
264    Ok(ca.with_name(name))
265}
266
267fn catch_err<K>(error: &mut Option<PyResult<Option<K>>>, result: PyResult<Option<K>>) -> Option<K> {
268    match result {
269        Ok(item) => item,
270        err => {
271            if error.is_none() {
272                *error = Some(err);
273            }
274            None
275        },
276    }
277}
278
279fn iterator_to_string<S: AsRef<str>>(
280    it: impl Iterator<Item = PyResult<Option<S>>>,
281    init_null_count: usize,
282    first_value: Option<S>,
283    name: PlSmallStr,
284    capacity: usize,
285) -> PyResult<StringChunked> {
286    let mut error = None;
287    // SAFETY: we know the iterators len.
288    let ca: StringChunked = unsafe {
289        if init_null_count > 0 {
290            (0..init_null_count)
291                .map(|_| Ok(None))
292                .chain(std::iter::once(Ok(first_value)))
293                .trust_my_length(capacity)
294                .map(|v| catch_err(&mut error, v))
295                .collect_trusted()
296        } else if first_value.is_some() {
297            std::iter::once(Ok(first_value))
298                .chain(it)
299                .trust_my_length(capacity)
300                .map(|v| catch_err(&mut error, v))
301                .collect_trusted()
302        } else {
303            it.map(|v| catch_err(&mut error, v)).collect()
304        }
305    };
306    debug_assert_eq!(ca.len(), capacity);
307    if let Some(err) = error {
308        let _ = err?;
309    }
310    Ok(ca.with_name(name))
311}
312
313fn iterator_to_list(
314    dt: &DataType,
315    it: impl Iterator<Item = PyResult<Option<Series>>>,
316    init_null_count: usize,
317    first_value: Option<&Series>,
318    name: PlSmallStr,
319    capacity: usize,
320) -> PyResult<ListChunked> {
321    let mut builder = get_list_builder(dt, capacity * 5, capacity, name);
322    for _ in 0..init_null_count {
323        builder.append_null()
324    }
325    if first_value.is_some() {
326        builder
327            .append_opt_series(first_value)
328            .map_err(PyPolarsErr::from)?;
329    }
330    for opt_val in it {
331        match opt_val? {
332            None => builder.append_null(),
333            Some(s) => {
334                if s.is_empty() && s.dtype() != dt {
335                    builder
336                        .append_series(&Series::full_null(PlSmallStr::EMPTY, 0, dt))
337                        .unwrap()
338                } else {
339                    builder.append_series(&s).map_err(PyPolarsErr::from)?
340                }
341            },
342        }
343    }
344    Ok(builder.finish())
345}