polars_python/map/
mod.rs

1pub mod dataframe;
2pub mod lazy;
3pub mod series;
4
5use std::collections::BTreeMap;
6
7use arrow::bitmap::BitmapBuilder;
8use polars::chunked_array::builder::get_list_builder;
9use polars::prelude::*;
10use polars_core::utils::CustomIterTools;
11use polars_core::POOL;
12use polars_utils::pl_str::PlSmallStr;
13use pyo3::prelude::*;
14use pyo3::pybacked::PyBackedStr;
15use pyo3::types::PyDict;
16use rayon::prelude::*;
17
18use crate::error::PyPolarsErr;
19use crate::prelude::ObjectValue;
20use crate::{PySeries, Wrap};
21
22pub trait PyPolarsNumericType: PolarsNumericType {}
23
24impl PyPolarsNumericType for UInt8Type {}
25impl PyPolarsNumericType for UInt16Type {}
26impl PyPolarsNumericType for UInt32Type {}
27impl PyPolarsNumericType for UInt64Type {}
28impl PyPolarsNumericType for Int8Type {}
29impl PyPolarsNumericType for Int16Type {}
30impl PyPolarsNumericType for Int32Type {}
31impl PyPolarsNumericType for Int64Type {}
32impl PyPolarsNumericType for Int128Type {}
33impl PyPolarsNumericType for Float32Type {}
34impl PyPolarsNumericType for Float64Type {}
35
36fn iterator_to_struct<'a>(
37    py: Python,
38    it: impl Iterator<Item = PyResult<Option<Bound<'a, PyAny>>>>,
39    init_null_count: usize,
40    first_value: AnyValue<'a>,
41    name: PlSmallStr,
42    capacity: usize,
43) -> PyResult<PySeries> {
44    let (vals, flds) = match &first_value {
45        av @ AnyValue::Struct(_, _, flds) => (av._iter_struct_av().collect::<Vec<_>>(), &**flds),
46        AnyValue::StructOwned(payload) => (payload.0.clone(), &*payload.1),
47        _ => {
48            return Err(crate::exceptions::ComputeError::new_err(format!(
49                "expected struct got {first_value:?}",
50            )))
51        },
52    };
53
54    // Every item in the struct is kept as its own buffer of AnyValues.
55    // So a struct with 2 items: {a, b} will have:
56    // [
57    //      [ a values ]
58    //      [ b values ]
59    // ]
60    let mut struct_fields: BTreeMap<PlSmallStr, Vec<AnyValue>> = BTreeMap::new();
61
62    // As a BTreeMap sorts its keys, we also need to track the original
63    // order of the field names.
64    let mut field_names_ordered: Vec<PlSmallStr> = Vec::with_capacity(flds.len());
65
66    // Use the first value and the known null count to initialize the buffers
67    // if we find a new key later on, we make a new entry in the BTree.
68    for (value, fld) in vals.into_iter().zip(flds) {
69        let mut buf = Vec::with_capacity(capacity);
70        buf.extend((0..init_null_count).map(|_| AnyValue::Null));
71        buf.push(value);
72        field_names_ordered.push(fld.name().clone());
73        struct_fields.insert(fld.name().clone(), buf);
74    }
75
76    let mut validity = BitmapBuilder::with_capacity(capacity);
77    validity.extend_constant(init_null_count, false);
78    validity.push(true);
79
80    for dict in it {
81        match dict? {
82            None => {
83                validity.push(false);
84                for field_items in struct_fields.values_mut() {
85                    field_items.push(AnyValue::Null);
86                }
87            },
88            Some(dict) => {
89                validity.push(true);
90                let dict = dict.downcast::<PyDict>()?;
91                let current_len = struct_fields
92                    .values()
93                    .next()
94                    .map(|buf| buf.len())
95                    .unwrap_or(0);
96
97                // We ignore the keys of the rest of the dicts,
98                // the first item determines the output name.
99                for (key, val) in dict.iter() {
100                    let key = key.str().unwrap().extract::<PyBackedStr>().unwrap();
101                    let item = val.extract::<Wrap<AnyValue>>()?;
102                    if let Some(buf) = struct_fields.get_mut(&*key) {
103                        buf.push(item.0);
104                    } else {
105                        let mut buf = Vec::with_capacity(capacity);
106                        buf.extend((0..init_null_count + current_len).map(|_| AnyValue::Null));
107                        buf.push(item.0);
108                        let key: PlSmallStr = (&*key).into();
109                        field_names_ordered.push(key.clone());
110                        struct_fields.insert(key, buf);
111                    };
112                }
113
114                // Add nulls to keys that were not in the dict.
115                if dict.len() < struct_fields.len() {
116                    let current_len = current_len + 1;
117                    for buf in struct_fields.values_mut() {
118                        if buf.len() < current_len {
119                            buf.push(AnyValue::Null)
120                        }
121                    }
122                }
123            },
124        }
125    }
126
127    let fields = py.allow_threads(|| {
128        POOL.install(|| {
129            field_names_ordered
130                .par_iter()
131                .map(|name| Series::new(name.clone(), struct_fields.get(name).unwrap()))
132                .collect::<Vec<_>>()
133        })
134    });
135
136    Ok(
137        StructChunked::from_series(name, fields[0].len(), fields.iter())
138            .unwrap()
139            .with_outer_validity(validity.into_opt_validity())
140            .into_series()
141            .into(),
142    )
143}
144
145fn iterator_to_primitive<T>(
146    it: impl Iterator<Item = PyResult<Option<T::Native>>>,
147    init_null_count: usize,
148    first_value: Option<T::Native>,
149    name: PlSmallStr,
150    capacity: usize,
151) -> PyResult<ChunkedArray<T>>
152where
153    T: PyPolarsNumericType,
154{
155    let mut error = None;
156    // SAFETY: we know the iterators len.
157    let ca: ChunkedArray<T> = unsafe {
158        if init_null_count > 0 {
159            (0..init_null_count)
160                .map(|_| Ok(None))
161                .chain(std::iter::once(Ok(first_value)))
162                .chain(it)
163                .trust_my_length(capacity)
164                .map(|v| catch_err(&mut error, v))
165                .collect_trusted()
166        } else if first_value.is_some() {
167            std::iter::once(Ok(first_value))
168                .chain(it)
169                .trust_my_length(capacity)
170                .map(|v| catch_err(&mut error, v))
171                .collect_trusted()
172        } else {
173            it.map(|v| catch_err(&mut error, v)).collect()
174        }
175    };
176    debug_assert_eq!(ca.len(), capacity);
177
178    if let Some(err) = error {
179        let _ = err?;
180    }
181    Ok(ca.with_name(name))
182}
183
184fn iterator_to_bool(
185    it: impl Iterator<Item = PyResult<Option<bool>>>,
186    init_null_count: usize,
187    first_value: Option<bool>,
188    name: PlSmallStr,
189    capacity: usize,
190) -> PyResult<ChunkedArray<BooleanType>> {
191    let mut error = None;
192    // SAFETY: we know the iterators len.
193    let ca: BooleanChunked = unsafe {
194        if init_null_count > 0 {
195            (0..init_null_count)
196                .map(|_| Ok(None))
197                .chain(std::iter::once(Ok(first_value)))
198                .chain(it)
199                .trust_my_length(capacity)
200                .map(|v| catch_err(&mut error, v))
201                .collect_trusted()
202        } else if first_value.is_some() {
203            std::iter::once(Ok(first_value))
204                .chain(it)
205                .trust_my_length(capacity)
206                .map(|v| catch_err(&mut error, v))
207                .collect_trusted()
208        } else {
209            it.map(|v| catch_err(&mut error, v)).collect()
210        }
211    };
212    if let Some(err) = error {
213        let _ = err?;
214    }
215    debug_assert_eq!(ca.len(), capacity);
216    Ok(ca.with_name(name))
217}
218
219#[cfg(feature = "object")]
220fn iterator_to_object(
221    it: impl Iterator<Item = PyResult<Option<ObjectValue>>>,
222    init_null_count: usize,
223    first_value: Option<ObjectValue>,
224    name: PlSmallStr,
225    capacity: usize,
226) -> PyResult<ObjectChunked<ObjectValue>> {
227    let mut error = None;
228    // SAFETY: we know the iterators len.
229    let ca: ObjectChunked<ObjectValue> = unsafe {
230        if init_null_count > 0 {
231            (0..init_null_count)
232                .map(|_| Ok(None))
233                .chain(std::iter::once(Ok(first_value)))
234                .chain(it)
235                .map(|v| catch_err(&mut error, v))
236                .trust_my_length(capacity)
237                .collect_trusted()
238        } else if first_value.is_some() {
239            std::iter::once(Ok(first_value))
240                .chain(it)
241                .map(|v| catch_err(&mut error, v))
242                .trust_my_length(capacity)
243                .collect_trusted()
244        } else {
245            it.map(|v| catch_err(&mut error, v)).collect()
246        }
247    };
248    if let Some(err) = error {
249        let _ = err?;
250    }
251    debug_assert_eq!(ca.len(), capacity);
252    Ok(ca.with_name(name))
253}
254
255fn catch_err<K>(error: &mut Option<PyResult<Option<K>>>, result: PyResult<Option<K>>) -> Option<K> {
256    match result {
257        Ok(item) => item,
258        err => {
259            if error.is_none() {
260                *error = Some(err);
261            }
262            None
263        },
264    }
265}
266
267fn iterator_to_string<S: AsRef<str>>(
268    it: impl Iterator<Item = PyResult<Option<S>>>,
269    init_null_count: usize,
270    first_value: Option<S>,
271    name: PlSmallStr,
272    capacity: usize,
273) -> PyResult<StringChunked> {
274    let mut error = None;
275    // SAFETY: we know the iterators len.
276    let ca: StringChunked = unsafe {
277        if init_null_count > 0 {
278            (0..init_null_count)
279                .map(|_| Ok(None))
280                .chain(std::iter::once(Ok(first_value)))
281                .trust_my_length(capacity)
282                .map(|v| catch_err(&mut error, v))
283                .collect_trusted()
284        } else if first_value.is_some() {
285            std::iter::once(Ok(first_value))
286                .chain(it)
287                .trust_my_length(capacity)
288                .map(|v| catch_err(&mut error, v))
289                .collect_trusted()
290        } else {
291            it.map(|v| catch_err(&mut error, v)).collect()
292        }
293    };
294    debug_assert_eq!(ca.len(), capacity);
295    if let Some(err) = error {
296        let _ = err?;
297    }
298    Ok(ca.with_name(name))
299}
300
301fn iterator_to_list(
302    dt: &DataType,
303    it: impl Iterator<Item = PyResult<Option<Series>>>,
304    init_null_count: usize,
305    first_value: Option<&Series>,
306    name: PlSmallStr,
307    capacity: usize,
308) -> PyResult<ListChunked> {
309    let mut builder = get_list_builder(dt, capacity * 5, capacity, name);
310    for _ in 0..init_null_count {
311        builder.append_null()
312    }
313    if first_value.is_some() {
314        builder
315            .append_opt_series(first_value)
316            .map_err(PyPolarsErr::from)?;
317    }
318    for opt_val in it {
319        match opt_val? {
320            None => builder.append_null(),
321            Some(s) => {
322                if s.len() == 0 && s.dtype() != dt {
323                    builder
324                        .append_series(&Series::full_null(PlSmallStr::EMPTY, 0, dt))
325                        .unwrap()
326                } else {
327                    builder.append_series(&s).map_err(PyPolarsErr::from)?
328                }
329            },
330        }
331    }
332    Ok(builder.finish())
333}