polars_core/chunked_array/ops/
nesting_utils.rs

1use arrow::array::{Array, IntoBoxedArray};
2use polars_compute::find_validity_mismatch::find_validity_mismatch;
3use polars_utils::IdxSize;
4
5use super::ListChunked;
6use crate::chunked_array::flags::StatisticsFlags;
7use crate::prelude::{ChunkedArray, FalseT, PolarsDataType};
8use crate::series::Series;
9use crate::series::implementations::null::NullChunked;
10use crate::utils::align_chunks_binary_ca_series;
11
12/// Utility methods for dealing with nested chunked arrays.
13pub trait ChunkNestingUtils: Sized {
14    /// Propagate nulls of nested datatype to all levels of nesting.
15    fn propagate_nulls(&self) -> Option<Self>;
16
17    /// Trim all lists of unused start and end elements recursively.
18    fn trim_lists_to_normalized_offsets(&self) -> Option<Self>;
19
20    /// Find the indices of the values where the validity mismatches.
21    ///
22    /// This is done recursively.
23    fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>);
24}
25
26impl ChunkNestingUtils for ListChunked {
27    fn propagate_nulls(&self) -> Option<Self> {
28        use polars_compute::propagate_nulls::propagate_nulls_list;
29
30        let flags = self.get_flags();
31
32        if flags.has_propagated_nulls() {
33            return None;
34        }
35
36        if !self.inner_dtype().is_nested() && !self.has_nulls() {
37            self.flags
38                .set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
39            return None;
40        }
41
42        let mut chunks = Vec::new();
43        for (i, chunk) in self.downcast_iter().enumerate() {
44            if let Some(propagated_chunk) = propagate_nulls_list(chunk) {
45                chunks.reserve(self.chunks.len());
46                chunks.extend(self.chunks[..i].iter().cloned());
47                chunks.push(propagated_chunk.into_boxed());
48                break;
49            }
50        }
51
52        // If we found a chunk that needs propagating, create a new ListChunked
53        if !chunks.is_empty() {
54            chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
55                match propagate_nulls_list(chunk) {
56                    None => chunk.to_boxed(),
57                    Some(chunk) => chunk.into_boxed(),
58                }
59            }));
60
61            // SAFETY: The length and null_count should remain the same.
62            let mut ca = unsafe {
63                Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
64            };
65            ca.set_flags(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
66            return Some(ca);
67        }
68
69        self.flags
70            .set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
71        None
72    }
73
74    fn trim_lists_to_normalized_offsets(&self) -> Option<Self> {
75        use polars_compute::trim_lists_to_normalized_offsets::trim_lists_to_normalized_offsets_list;
76
77        let flags = self.get_flags();
78
79        if flags.has_trimmed_lists_to_normalized_offsets() {
80            return None;
81        }
82
83        let mut chunks = Vec::new();
84        for (i, chunk) in self.downcast_iter().enumerate() {
85            if let Some(trimmed) = trim_lists_to_normalized_offsets_list(chunk) {
86                chunks.reserve(self.chunks.len());
87                chunks.extend(self.chunks[..i].iter().cloned());
88                chunks.push(trimmed.into_boxed());
89                break;
90            }
91        }
92
93        // If we found a chunk that needs compacting, create a new ArrayChunked
94        if !chunks.is_empty() {
95            chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
96                match trim_lists_to_normalized_offsets_list(chunk) {
97                    Some(chunk) => chunk.into_boxed(),
98                    None => chunk.to_boxed(),
99                }
100            }));
101
102            // SAFETY: The length and null_count should remain the same.
103            let mut ca = unsafe {
104                Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
105            };
106
107            ca.set_flags(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
108            return Some(ca);
109        }
110
111        self.flags
112            .set(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
113        None
114    }
115
116    fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
117        let (slf, other) = align_chunks_binary_ca_series(self, other);
118        let mut offset: IdxSize = 0;
119        for (l, r) in slf.downcast_iter().zip(other.chunks()) {
120            let start_length = idxs.len();
121            find_validity_mismatch(l, r.as_ref(), idxs);
122            for idx in idxs[start_length..].iter_mut() {
123                *idx += offset;
124            }
125            offset += l.len() as IdxSize;
126        }
127    }
128}
129
130#[cfg(feature = "dtype-array")]
131impl ChunkNestingUtils for super::ArrayChunked {
132    fn propagate_nulls(&self) -> Option<Self> {
133        use polars_compute::propagate_nulls::propagate_nulls_fsl;
134
135        let flags = self.get_flags();
136
137        if flags.has_propagated_nulls() {
138            return None;
139        }
140
141        if !self.inner_dtype().is_nested() && !self.has_nulls() {
142            self.flags
143                .set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
144            return None;
145        }
146
147        let mut chunks = Vec::new();
148        for (i, chunk) in self.downcast_iter().enumerate() {
149            if let Some(propagated_chunk) = propagate_nulls_fsl(chunk) {
150                chunks.reserve(self.chunks.len());
151                chunks.extend(self.chunks[..i].iter().cloned());
152                chunks.push(propagated_chunk.into_boxed());
153                break;
154            }
155        }
156
157        // If we found a chunk that needs propagating, create a new ListChunked
158        if !chunks.is_empty() {
159            chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
160                match propagate_nulls_fsl(chunk) {
161                    None => chunk.to_boxed(),
162                    Some(chunk) => chunk.into_boxed(),
163                }
164            }));
165
166            // SAFETY: The length and null_count should remain the same.
167            let mut ca = unsafe {
168                Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
169            };
170            ca.set_flags(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
171            return Some(ca);
172        }
173
174        self.flags
175            .set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
176        None
177    }
178
179    fn trim_lists_to_normalized_offsets(&self) -> Option<Self> {
180        use polars_compute::trim_lists_to_normalized_offsets::trim_lists_to_normalized_offsets_fsl;
181
182        let flags = self.get_flags();
183
184        if flags.has_trimmed_lists_to_normalized_offsets()
185            || !self.inner_dtype().contains_list_recursive()
186        {
187            return None;
188        }
189
190        let mut chunks = Vec::new();
191        for (i, chunk) in self.downcast_iter().enumerate() {
192            if let Some(trimmed) = trim_lists_to_normalized_offsets_fsl(chunk) {
193                chunks.reserve(self.chunks.len());
194                chunks.extend(self.chunks[..i].iter().cloned());
195                chunks.push(trimmed.into_boxed());
196                break;
197            }
198        }
199
200        // If we found a chunk that needs compacting, create a new ArrayChunked
201        if !chunks.is_empty() {
202            chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
203                match trim_lists_to_normalized_offsets_fsl(chunk) {
204                    Some(chunk) => chunk.into_boxed(),
205                    None => chunk.to_boxed(),
206                }
207            }));
208
209            // SAFETY: The length and null_count should remain the same.
210            let mut ca = unsafe {
211                Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
212            };
213            ca.set_flags(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
214            return Some(ca);
215        }
216
217        self.flags
218            .set(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
219        None
220    }
221
222    fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
223        let (slf, other) = align_chunks_binary_ca_series(self, other);
224        let mut offset: IdxSize = 0;
225        for (l, r) in slf.downcast_iter().zip(other.chunks()) {
226            let start_length = idxs.len();
227            find_validity_mismatch(l, r.as_ref(), idxs);
228            for idx in idxs[start_length..].iter_mut() {
229                *idx += offset;
230            }
231            offset += l.len() as IdxSize;
232        }
233    }
234}
235
236#[cfg(feature = "dtype-struct")]
237impl ChunkNestingUtils for super::StructChunked {
238    fn propagate_nulls(&self) -> Option<Self> {
239        use polars_compute::propagate_nulls::propagate_nulls_struct;
240
241        let flags = self.get_flags();
242
243        if flags.has_propagated_nulls() {
244            return None;
245        }
246
247        if self.struct_fields().iter().all(|f| !f.dtype().is_nested()) && !self.has_nulls() {
248            self.flags
249                .set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
250            return None;
251        }
252
253        let mut chunks = Vec::new();
254        for (i, chunk) in self.downcast_iter().enumerate() {
255            if let Some(propagated_chunk) = propagate_nulls_struct(chunk) {
256                chunks.reserve(self.chunks.len());
257                chunks.extend(self.chunks[..i].iter().cloned());
258                chunks.push(propagated_chunk.into_boxed());
259                break;
260            }
261        }
262
263        // If we found a chunk that needs propagating, create a new ListChunked
264        if !chunks.is_empty() {
265            chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
266                match propagate_nulls_struct(chunk) {
267                    None => chunk.to_boxed(),
268                    Some(chunk) => chunk.into_boxed(),
269                }
270            }));
271
272            // SAFETY: The length and null_count should remain the same.
273            let mut ca = unsafe {
274                Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
275            };
276            ca.set_flags(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
277            return Some(ca);
278        }
279
280        self.flags
281            .set(flags | StatisticsFlags::HAS_PROPAGATED_NULLS);
282        None
283    }
284
285    fn trim_lists_to_normalized_offsets(&self) -> Option<Self> {
286        use polars_compute::trim_lists_to_normalized_offsets::trim_lists_to_normalized_offsets_struct;
287
288        let flags = self.get_flags();
289
290        if flags.has_trimmed_lists_to_normalized_offsets()
291            || !self
292                .struct_fields()
293                .iter()
294                .any(|f| f.dtype().contains_list_recursive())
295        {
296            return None;
297        }
298
299        let mut chunks = Vec::new();
300        for (i, chunk) in self.downcast_iter().enumerate() {
301            if let Some(trimmed) = trim_lists_to_normalized_offsets_struct(chunk) {
302                chunks.reserve(self.chunks.len());
303                chunks.extend(self.chunks[..i].iter().cloned());
304                chunks.push(trimmed.into_boxed());
305                break;
306            }
307        }
308
309        // If we found a chunk that needs compacting, create a new ArrayChunked
310        if !chunks.is_empty() {
311            chunks.extend(self.downcast_iter().skip(chunks.len()).map(|chunk| {
312                match trim_lists_to_normalized_offsets_struct(chunk) {
313                    Some(chunk) => chunk.into_boxed(),
314                    None => chunk.to_boxed(),
315                }
316            }));
317
318            // SAFETY: The length and null_count should remain the same.
319            let mut ca = unsafe {
320                Self::new_with_dims(self.field.clone(), chunks, self.length, self.null_count)
321            };
322            ca.set_flags(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
323            return Some(ca);
324        }
325
326        self.flags
327            .set(flags | StatisticsFlags::HAS_TRIMMED_LISTS_TO_NORMALIZED_OFFSETS);
328        None
329    }
330
331    fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
332        let (slf, other) = align_chunks_binary_ca_series(self, other);
333        let mut offset: IdxSize = 0;
334        for (l, r) in slf.downcast_iter().zip(other.chunks()) {
335            let start_length = idxs.len();
336            find_validity_mismatch(l, r.as_ref(), idxs);
337            for idx in idxs[start_length..].iter_mut() {
338                *idx += offset;
339            }
340            offset += l.len() as IdxSize;
341        }
342    }
343}
344
345impl<T: PolarsDataType<IsNested = FalseT>> ChunkNestingUtils for ChunkedArray<T> {
346    fn propagate_nulls(&self) -> Option<Self> {
347        None
348    }
349
350    fn trim_lists_to_normalized_offsets(&self) -> Option<Self> {
351        None
352    }
353
354    fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
355        let slf_nc = self.null_count();
356        let other_nc = other.null_count();
357
358        // Fast path for non-nested datatypes.
359        if slf_nc == other_nc && (slf_nc == 0 || slf_nc == self.len()) {
360            return;
361        }
362
363        let (slf, other) = align_chunks_binary_ca_series(self, other);
364        let mut offset: IdxSize = 0;
365        for (l, r) in slf.downcast_iter().zip(other.chunks()) {
366            let start_length = idxs.len();
367            find_validity_mismatch(l, r.as_ref(), idxs);
368            for idx in idxs[start_length..].iter_mut() {
369                *idx += offset;
370            }
371            offset += l.len() as IdxSize;
372        }
373    }
374}
375
376impl ChunkNestingUtils for NullChunked {
377    fn propagate_nulls(&self) -> Option<Self> {
378        None
379    }
380
381    fn trim_lists_to_normalized_offsets(&self) -> Option<Self> {
382        None
383    }
384
385    fn find_validity_mismatch(&self, other: &Series, idxs: &mut Vec<IdxSize>) {
386        let other_nc = other.null_count();
387
388        // Fast path for non-nested datatypes.
389        if other_nc == self.len() {
390            return;
391        }
392
393        match other.rechunk_validity() {
394            None => idxs.extend(0..self.len() as IdxSize),
395            Some(v) => idxs.extend(v.true_idx_iter().map(|v| v as IdxSize)),
396        }
397    }
398}