1pub mod dataframe;
2pub mod lazy;
3pub mod series;
4
5use std::collections::BTreeMap;
6
7use arrow::bitmap::BitmapBuilder;
8use polars::chunked_array::builder::get_list_builder;
9use polars::prelude::*;
10use polars_core::utils::CustomIterTools;
11use polars_core::POOL;
12use polars_utils::pl_str::PlSmallStr;
13use pyo3::prelude::*;
14use pyo3::pybacked::PyBackedStr;
15use pyo3::types::PyDict;
16use rayon::prelude::*;
17
18use crate::error::PyPolarsErr;
19use crate::prelude::ObjectValue;
20use crate::{PySeries, Wrap};
21
22pub trait PyPolarsNumericType: PolarsNumericType {}
23
24impl PyPolarsNumericType for UInt8Type {}
25impl PyPolarsNumericType for UInt16Type {}
26impl PyPolarsNumericType for UInt32Type {}
27impl PyPolarsNumericType for UInt64Type {}
28impl PyPolarsNumericType for Int8Type {}
29impl PyPolarsNumericType for Int16Type {}
30impl PyPolarsNumericType for Int32Type {}
31impl PyPolarsNumericType for Int64Type {}
32impl PyPolarsNumericType for Int128Type {}
33impl PyPolarsNumericType for Float32Type {}
34impl PyPolarsNumericType for Float64Type {}
35
36fn iterator_to_struct<'a>(
37 py: Python,
38 it: impl Iterator<Item = PyResult<Option<Bound<'a, PyAny>>>>,
39 init_null_count: usize,
40 first_value: AnyValue<'a>,
41 name: PlSmallStr,
42 capacity: usize,
43) -> PyResult<PySeries> {
44 let (vals, flds) = match &first_value {
45 av @ AnyValue::Struct(_, _, flds) => (av._iter_struct_av().collect::<Vec<_>>(), &**flds),
46 AnyValue::StructOwned(payload) => (payload.0.clone(), &*payload.1),
47 _ => {
48 return Err(crate::exceptions::ComputeError::new_err(format!(
49 "expected struct got {first_value:?}",
50 )))
51 },
52 };
53
54 let mut struct_fields: BTreeMap<PlSmallStr, Vec<AnyValue>> = BTreeMap::new();
61
62 let mut field_names_ordered: Vec<PlSmallStr> = Vec::with_capacity(flds.len());
65
66 for (value, fld) in vals.into_iter().zip(flds) {
69 let mut buf = Vec::with_capacity(capacity);
70 buf.extend((0..init_null_count).map(|_| AnyValue::Null));
71 buf.push(value);
72 field_names_ordered.push(fld.name().clone());
73 struct_fields.insert(fld.name().clone(), buf);
74 }
75
76 let mut validity = BitmapBuilder::with_capacity(capacity);
77 validity.extend_constant(init_null_count, false);
78 validity.push(true);
79
80 for dict in it {
81 match dict? {
82 None => {
83 validity.push(false);
84 for field_items in struct_fields.values_mut() {
85 field_items.push(AnyValue::Null);
86 }
87 },
88 Some(dict) => {
89 validity.push(true);
90 let dict = dict.downcast::<PyDict>()?;
91 let current_len = struct_fields
92 .values()
93 .next()
94 .map(|buf| buf.len())
95 .unwrap_or(0);
96
97 for (key, val) in dict.iter() {
100 let key = key.str().unwrap().extract::<PyBackedStr>().unwrap();
101 let item = val.extract::<Wrap<AnyValue>>()?;
102 if let Some(buf) = struct_fields.get_mut(&*key) {
103 buf.push(item.0);
104 } else {
105 let mut buf = Vec::with_capacity(capacity);
106 buf.extend((0..init_null_count + current_len).map(|_| AnyValue::Null));
107 buf.push(item.0);
108 let key: PlSmallStr = (&*key).into();
109 field_names_ordered.push(key.clone());
110 struct_fields.insert(key, buf);
111 };
112 }
113
114 if dict.len() < struct_fields.len() {
116 let current_len = current_len + 1;
117 for buf in struct_fields.values_mut() {
118 if buf.len() < current_len {
119 buf.push(AnyValue::Null)
120 }
121 }
122 }
123 },
124 }
125 }
126
127 let fields = py.allow_threads(|| {
128 POOL.install(|| {
129 field_names_ordered
130 .par_iter()
131 .map(|name| Series::new(name.clone(), struct_fields.get(name).unwrap()))
132 .collect::<Vec<_>>()
133 })
134 });
135
136 Ok(
137 StructChunked::from_series(name, fields[0].len(), fields.iter())
138 .unwrap()
139 .with_outer_validity(validity.into_opt_validity())
140 .into_series()
141 .into(),
142 )
143}
144
145fn iterator_to_primitive<T>(
146 it: impl Iterator<Item = PyResult<Option<T::Native>>>,
147 init_null_count: usize,
148 first_value: Option<T::Native>,
149 name: PlSmallStr,
150 capacity: usize,
151) -> PyResult<ChunkedArray<T>>
152where
153 T: PyPolarsNumericType,
154{
155 let mut error = None;
156 let ca: ChunkedArray<T> = unsafe {
158 if init_null_count > 0 {
159 (0..init_null_count)
160 .map(|_| Ok(None))
161 .chain(std::iter::once(Ok(first_value)))
162 .chain(it)
163 .trust_my_length(capacity)
164 .map(|v| catch_err(&mut error, v))
165 .collect_trusted()
166 } else if first_value.is_some() {
167 std::iter::once(Ok(first_value))
168 .chain(it)
169 .trust_my_length(capacity)
170 .map(|v| catch_err(&mut error, v))
171 .collect_trusted()
172 } else {
173 it.map(|v| catch_err(&mut error, v)).collect()
174 }
175 };
176 debug_assert_eq!(ca.len(), capacity);
177
178 if let Some(err) = error {
179 let _ = err?;
180 }
181 Ok(ca.with_name(name))
182}
183
184fn iterator_to_bool(
185 it: impl Iterator<Item = PyResult<Option<bool>>>,
186 init_null_count: usize,
187 first_value: Option<bool>,
188 name: PlSmallStr,
189 capacity: usize,
190) -> PyResult<ChunkedArray<BooleanType>> {
191 let mut error = None;
192 let ca: BooleanChunked = unsafe {
194 if init_null_count > 0 {
195 (0..init_null_count)
196 .map(|_| Ok(None))
197 .chain(std::iter::once(Ok(first_value)))
198 .chain(it)
199 .trust_my_length(capacity)
200 .map(|v| catch_err(&mut error, v))
201 .collect_trusted()
202 } else if first_value.is_some() {
203 std::iter::once(Ok(first_value))
204 .chain(it)
205 .trust_my_length(capacity)
206 .map(|v| catch_err(&mut error, v))
207 .collect_trusted()
208 } else {
209 it.map(|v| catch_err(&mut error, v)).collect()
210 }
211 };
212 if let Some(err) = error {
213 let _ = err?;
214 }
215 debug_assert_eq!(ca.len(), capacity);
216 Ok(ca.with_name(name))
217}
218
219#[cfg(feature = "object")]
220fn iterator_to_object(
221 it: impl Iterator<Item = PyResult<Option<ObjectValue>>>,
222 init_null_count: usize,
223 first_value: Option<ObjectValue>,
224 name: PlSmallStr,
225 capacity: usize,
226) -> PyResult<ObjectChunked<ObjectValue>> {
227 let mut error = None;
228 let ca: ObjectChunked<ObjectValue> = unsafe {
230 if init_null_count > 0 {
231 (0..init_null_count)
232 .map(|_| Ok(None))
233 .chain(std::iter::once(Ok(first_value)))
234 .chain(it)
235 .map(|v| catch_err(&mut error, v))
236 .trust_my_length(capacity)
237 .collect_trusted()
238 } else if first_value.is_some() {
239 std::iter::once(Ok(first_value))
240 .chain(it)
241 .map(|v| catch_err(&mut error, v))
242 .trust_my_length(capacity)
243 .collect_trusted()
244 } else {
245 it.map(|v| catch_err(&mut error, v)).collect()
246 }
247 };
248 if let Some(err) = error {
249 let _ = err?;
250 }
251 debug_assert_eq!(ca.len(), capacity);
252 Ok(ca.with_name(name))
253}
254
255fn catch_err<K>(error: &mut Option<PyResult<Option<K>>>, result: PyResult<Option<K>>) -> Option<K> {
256 match result {
257 Ok(item) => item,
258 err => {
259 if error.is_none() {
260 *error = Some(err);
261 }
262 None
263 },
264 }
265}
266
267fn iterator_to_string<S: AsRef<str>>(
268 it: impl Iterator<Item = PyResult<Option<S>>>,
269 init_null_count: usize,
270 first_value: Option<S>,
271 name: PlSmallStr,
272 capacity: usize,
273) -> PyResult<StringChunked> {
274 let mut error = None;
275 let ca: StringChunked = unsafe {
277 if init_null_count > 0 {
278 (0..init_null_count)
279 .map(|_| Ok(None))
280 .chain(std::iter::once(Ok(first_value)))
281 .trust_my_length(capacity)
282 .map(|v| catch_err(&mut error, v))
283 .collect_trusted()
284 } else if first_value.is_some() {
285 std::iter::once(Ok(first_value))
286 .chain(it)
287 .trust_my_length(capacity)
288 .map(|v| catch_err(&mut error, v))
289 .collect_trusted()
290 } else {
291 it.map(|v| catch_err(&mut error, v)).collect()
292 }
293 };
294 debug_assert_eq!(ca.len(), capacity);
295 if let Some(err) = error {
296 let _ = err?;
297 }
298 Ok(ca.with_name(name))
299}
300
301fn iterator_to_list(
302 dt: &DataType,
303 it: impl Iterator<Item = PyResult<Option<Series>>>,
304 init_null_count: usize,
305 first_value: Option<&Series>,
306 name: PlSmallStr,
307 capacity: usize,
308) -> PyResult<ListChunked> {
309 let mut builder = get_list_builder(dt, capacity * 5, capacity, name);
310 for _ in 0..init_null_count {
311 builder.append_null()
312 }
313 if first_value.is_some() {
314 builder
315 .append_opt_series(first_value)
316 .map_err(PyPolarsErr::from)?;
317 }
318 for opt_val in it {
319 match opt_val? {
320 None => builder.append_null(),
321 Some(s) => {
322 if s.len() == 0 && s.dtype() != dt {
323 builder
324 .append_series(&Series::full_null(PlSmallStr::EMPTY, 0, dt))
325 .unwrap()
326 } else {
327 builder.append_series(&s).map_err(PyPolarsErr::from)?
328 }
329 },
330 }
331 }
332 Ok(builder.finish())
333}