Skip to main content

re_chunk/
iter.rs

1use std::sync::Arc;
2
3use arrow::array::{
4    Array as ArrowArray, ArrayRef as ArrowArrayRef, ArrowPrimitiveType, BinaryArray,
5    BooleanArray as ArrowBooleanArray, FixedSizeListArray as ArrowFixedSizeListArray,
6    LargeBinaryArray, ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray,
7    StringArray as ArrowStringArray, StructArray as ArrowStructArray,
8};
9use arrow::buffer::{
10    BooleanBuffer as ArrowBooleanBuffer, Buffer, ScalarBuffer as ArrowScalarBuffer,
11};
12use arrow::datatypes::ArrowNativeType;
13use itertools::{Either, Itertools as _, izip};
14use re_arrow_util::ArrowArrayDowncastRef as _;
15use re_log_types::{TimeInt, TimePoint, TimelineName};
16use re_span::Span;
17use re_types_core::{ArrowString, Component, ComponentIdentifier};
18
19use crate::{Chunk, RowId, TimeColumn};
20
21// ---
22
23fn error_on_downcast_failure(
24    component: ComponentIdentifier,
25    target: &str,
26    actual: &arrow::datatypes::DataType,
27) {
28    re_log::debug_panic!(
29        "downcast to {target} failed for {component}. Array data type was {actual:?}. Data discarded"
30    );
31    re_log::error_once!(
32        "downcast to {target} failed for {component}. Array data type was {actual:?}. Data discarded"
33    );
34}
35
36// ---
37
38// NOTE: Regarding the use of (recursive) `Either` in this file: it is _not_ arbitrary.
39//
40// They _should_ all follow this model:
41// * The first layer is always the emptiness layer: `Left` is empty, `Right` is non-empty.
42// * The second layer is the temporarily layer: `Left` is static, `Right` is temporal.
43// * Any layers beyond that follow the same pattern: `Left` doesn't have something, while `Right` does.
44
45impl Chunk {
46    /// Return the raw component list array values for a given component.
47    ///
48    /// Use with great care: Component data may have arbitrary gaps.
49    pub fn raw_component_array(&self, component: ComponentIdentifier) -> Option<&ArrowArrayRef> {
50        self.components
51            .get_array(component)
52            .map(|list_array| list_array.values())
53    }
54
55    /// Returns an iterator over the indices (`(TimeInt, RowId)`) of a [`Chunk`], for a given timeline.
56    ///
57    /// If the chunk is static, `timeline` will be ignored.
58    ///
59    /// See also:
60    /// * [`Self::iter_component_indices`].
61    /// * [`Self::iter_indices_owned`].
62    #[inline]
63    pub fn iter_indices(
64        &self,
65        timeline: &TimelineName,
66    ) -> impl Iterator<Item = (TimeInt, RowId)> + '_ + use<'_> {
67        if self.is_static() {
68            Either::Right(Either::Left(izip!(
69                std::iter::repeat(TimeInt::STATIC),
70                self.row_ids()
71            )))
72        } else {
73            let Some(time_column) = self.timelines.get(timeline) else {
74                return Either::Left(std::iter::empty());
75            };
76
77            Either::Right(Either::Right(izip!(time_column.times(), self.row_ids())))
78        }
79    }
80
81    /// Returns an iterator over the indices (`(TimeInt, RowId)`) of a [`Chunk`], for a given
82    /// timeline and component.
83    ///
84    /// If the chunk is static, `timeline` will be ignored.
85    ///
86    /// This is different than [`Self::iter_indices`] in that it will only yield indices for rows
87    /// at which there is data for the specified component.
88    ///
89    /// See also [`Self::iter_indices`].
90    pub fn iter_component_indices(
91        &self,
92        timeline: TimelineName,
93        component: ComponentIdentifier,
94    ) -> impl Iterator<Item = (TimeInt, RowId)> + '_ + use<'_> {
95        let Some(list_array) = self.components.get_array(component) else {
96            return Either::Left(std::iter::empty());
97        };
98
99        if self.is_static() {
100            let indices = izip!(std::iter::repeat(TimeInt::STATIC), self.row_ids());
101
102            if let Some(validity) = list_array.nulls() {
103                Either::Right(Either::Left(Either::Left(
104                    indices
105                        .enumerate()
106                        .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
107                )))
108            } else {
109                Either::Right(Either::Left(Either::Right(indices)))
110            }
111        } else {
112            let Some(time_column) = self.timelines.get(&timeline) else {
113                return Either::Left(std::iter::empty());
114            };
115
116            let indices = izip!(time_column.times(), self.row_ids());
117
118            if let Some(validity) = list_array.nulls() {
119                Either::Right(Either::Right(Either::Left(
120                    indices
121                        .enumerate()
122                        .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
123                )))
124            } else {
125                Either::Right(Either::Right(Either::Right(indices)))
126            }
127        }
128    }
129
130    /// Returns an iterator over the [`TimePoint`]s of a [`Chunk`].
131    ///
132    /// See also:
133    /// * [`Self::iter_component_timepoints`].
134    #[inline]
135    pub fn iter_timepoints(&self) -> impl Iterator<Item = TimePoint> + '_ {
136        let mut timelines = self
137            .timelines
138            .values()
139            .map(|time_column| (time_column.timeline, time_column.times()))
140            .collect_vec();
141
142        std::iter::from_fn(move || {
143            let mut timepoint = TimePoint::default();
144            for (timeline, times) in &mut timelines {
145                timepoint.insert(*timeline, times.next()?);
146            }
147            Some(timepoint)
148        })
149    }
150
151    /// Returns an iterator over the [`TimePoint`]s of a [`Chunk`], for a given component.
152    ///
153    /// This is different than [`Self::iter_timepoints`] in that it will only yield timepoints for rows
154    /// at which there is data for the specified component.
155    ///
156    /// See also [`Self::iter_timepoints`].
157    pub fn iter_component_timepoints(
158        &self,
159        component: ComponentIdentifier,
160    ) -> impl Iterator<Item = TimePoint> + '_ + use<'_> {
161        let Some(list_array) = self.components.get_array(component) else {
162            return Either::Left(std::iter::empty());
163        };
164
165        if let Some(validity) = list_array.nulls() {
166            let mut timelines = self
167                .timelines
168                .values()
169                .map(|time_column| {
170                    (
171                        time_column.timeline,
172                        time_column
173                            .times()
174                            .enumerate()
175                            .filter(|(i, _)| validity.is_valid(*i))
176                            .map(|(_, time)| time),
177                    )
178                })
179                .collect_vec();
180
181            Either::Right(Either::Left(std::iter::from_fn(move || {
182                let mut timepoint = TimePoint::default();
183                for (timeline, times) in &mut timelines {
184                    timepoint.insert(*timeline, times.next()?);
185                }
186                Some(timepoint)
187            })))
188        } else {
189            let mut timelines = self
190                .timelines
191                .values()
192                .map(|time_column| (time_column.timeline, time_column.times()))
193                .collect_vec();
194
195            Either::Right(Either::Right(std::iter::from_fn(move || {
196                let mut timepoint = TimePoint::default();
197                for (timeline, times) in &mut timelines {
198                    timepoint.insert(*timeline, times.next()?);
199                }
200                Some(timepoint)
201            })))
202        }
203    }
204
205    /// Returns an iterator over the offsets & lengths of component arrays within [`Chunk`], for a given
206    /// component.
207    ///
208    /// I.e. each span describes the position of a component batch in the
209    /// underlying arrow array of values.
210    pub fn iter_component_offsets(
211        &self,
212        component: ComponentIdentifier,
213    ) -> impl Iterator<Item = Span<usize>> {
214        let Some(list_array) = self.components.get_array(component) else {
215            return Either::Left(std::iter::empty());
216        };
217
218        let offsets = list_array.offsets().iter().map(|idx| *idx as usize);
219        let lengths = list_array.offsets().lengths();
220
221        if let Some(validity) = list_array.nulls() {
222            Either::Right(Either::Left(
223                izip!(offsets, lengths)
224                    .enumerate()
225                    .filter_map(|(i, o)| validity.is_valid(i).then_some(o))
226                    .map(|(start, len)| Span { start, len }),
227            ))
228        } else {
229            Either::Right(Either::Right(
230                izip!(offsets, lengths).map(|(start, len)| Span { start, len }),
231            ))
232        }
233    }
234
235    /// Returns an iterator over the all the sliced component batches in a [`Chunk`]'s column, for
236    /// a given component.
237    ///
238    /// The generic `S` parameter will decide the type of data returned. It is _very_ permissive.
239    /// See [`ChunkComponentSlicer`] for all the available implementations.
240    ///
241    /// This is a very fast path: the entire column will be downcasted at once, and then every
242    /// component batch will be a slice reference into that global slice.
243    ///
244    /// See also [`Self::iter_slices_from_struct_field`].
245    #[inline]
246    pub fn iter_slices<'a, S: 'a + ChunkComponentSlicer>(
247        &'a self,
248        component: ComponentIdentifier,
249    ) -> impl Iterator<Item = S::Item<'a>> + 'a + use<'a, S> {
250        let Some(list_array) = self.components.get_array(component) else {
251            return Either::Left(std::iter::empty());
252        };
253
254        let component_offset_values = self.iter_component_offsets(component);
255
256        Either::Right(S::slice(
257            component,
258            &**list_array.values() as _,
259            component_offset_values,
260        ))
261    }
262
263    /// Returns an iterator over the all the sliced component batches in a [`Chunk`]'s column, for
264    /// a specific struct field of given component.
265    ///
266    /// The target component must be a `StructArray`.
267    ///
268    /// The generic `S` parameter will decide the type of data returned. It is _very_ permissive.
269    /// See [`ChunkComponentSlicer`] for all the available implementations.
270    ///
271    /// This is a very fast path: the entire column will be downcasted at once, and then every
272    /// component batch will be a slice reference into that global slice.
273    ///
274    /// See also [`Self::iter_slices_from_struct_field`].
275    pub fn iter_slices_from_struct_field<'a, S: 'a + ChunkComponentSlicer>(
276        &'a self,
277        component: ComponentIdentifier,
278        field_name: &'a str,
279    ) -> impl Iterator<Item = S::Item<'a>> + 'a {
280        let Some(list_array) = self.components.get_array(component) else {
281            return Either::Left(std::iter::empty());
282        };
283
284        let Some(struct_array) = list_array.values().downcast_array_ref::<ArrowStructArray>()
285        else {
286            error_on_downcast_failure(component, "ArrowStructArray", list_array.data_type());
287            return Either::Left(std::iter::empty());
288        };
289
290        let Some(field_idx) = struct_array
291            .fields()
292            .iter()
293            .enumerate()
294            .find_map(|(i, field)| (field.name() == field_name).then_some(i))
295        else {
296            re_log::debug_panic!("field {field_name} not found for {component}, data discarded");
297            re_log::error_once!("field {field_name} not found for {component}, data discarded");
298            return Either::Left(std::iter::empty());
299        };
300
301        if field_idx >= struct_array.num_columns() {
302            re_log::debug_panic!("field {field_name} not found for {component}, data discarded");
303            re_log::error_once!("field {field_name} not found for {component}, data discarded");
304            return Either::Left(std::iter::empty());
305        }
306
307        let component_offset_values = self.iter_component_offsets(component);
308
309        Either::Right(S::slice(
310            component,
311            struct_array.column(field_idx),
312            component_offset_values,
313        ))
314    }
315}
316
317// ---
318
319/// A `ChunkComponentSlicer` knows how to efficiently slice component batches out of a Chunk column.
320///
321/// See [`Chunk::iter_slices`] and [`Chunk::iter_slices_from_struct_field`].
322pub trait ChunkComponentSlicer {
323    type Item<'a>;
324
325    fn slice<'a>(
326        component: ComponentIdentifier,
327        array: &'a dyn ArrowArray,
328        component_spans: impl Iterator<Item = Span<usize>> + 'a,
329    ) -> impl Iterator<Item = Self::Item<'a>>;
330}
331
332/// The actual implementation of `impl_native_type!`, so that we don't have to work in a macro.
333fn slice_as_native<'a, P, T>(
334    component: ComponentIdentifier,
335    array: &'a dyn ArrowArray,
336    component_spans: impl Iterator<Item = Span<usize>> + 'a,
337) -> impl Iterator<Item = &'a [T]> + 'a
338where
339    P: ArrowPrimitiveType<Native = T>,
340    T: ArrowNativeType,
341{
342    let Some(values) = array.downcast_array_ref::<ArrowPrimitiveArray<P>>() else {
343        error_on_downcast_failure(component, "ArrowPrimitiveArray<T>", array.data_type());
344        return Either::Left(std::iter::empty());
345    };
346    let values = values.values().as_ref();
347
348    // NOTE: No need for validity checks here, `iter_offsets` already takes care of that.
349    Either::Right(component_spans.map(move |range| &values[range.range()]))
350}
351
352// We use a macro instead of a blanket impl because this violates orphan rules.
353macro_rules! impl_native_type {
354    ($arrow_primitive_type:ty, $native_type:ty) => {
355        impl ChunkComponentSlicer for $native_type {
356            type Item<'a> = &'a [$native_type];
357
358            fn slice<'a>(
359                component: ComponentIdentifier,
360                array: &'a dyn ArrowArray,
361                component_spans: impl Iterator<Item = Span<usize>> + 'a,
362            ) -> impl Iterator<Item = Self::Item<'a>> {
363                slice_as_native::<$arrow_primitive_type, $native_type>(
364                    component,
365                    array,
366                    component_spans,
367                )
368            }
369        }
370    };
371}
372
373impl_native_type!(arrow::array::types::UInt8Type, u8);
374impl_native_type!(arrow::array::types::UInt16Type, u16);
375impl_native_type!(arrow::array::types::UInt32Type, u32);
376impl_native_type!(arrow::array::types::UInt64Type, u64);
377// impl_native_type!(arrow::array::types::UInt128Type, u128);
378impl_native_type!(arrow::array::types::Int8Type, i8);
379impl_native_type!(arrow::array::types::Int16Type, i16);
380impl_native_type!(arrow::array::types::Int32Type, i32);
381impl_native_type!(arrow::array::types::Int64Type, i64);
382// impl_native_type!(arrow::array::types::Int128Type, i128);
383impl_native_type!(arrow::array::types::Float16Type, half::f16);
384impl_native_type!(arrow::array::types::Float32Type, f32);
385impl_native_type!(arrow::array::types::Float64Type, f64);
386
387/// The actual implementation of `impl_array_native_type!`, so that we don't have to work in a macro.
388fn slice_as_array_native<'a, const N: usize, P, T>(
389    component: ComponentIdentifier,
390    array: &'a dyn ArrowArray,
391    component_spans: impl Iterator<Item = Span<usize>> + 'a,
392) -> impl Iterator<Item = &'a [[T; N]]> + 'a
393where
394    [T; N]: bytemuck::Pod,
395    P: ArrowPrimitiveType<Native = T>,
396    T: ArrowNativeType + bytemuck::Pod,
397{
398    let Some(fixed_size_list_array) = array.downcast_array_ref::<ArrowFixedSizeListArray>() else {
399        error_on_downcast_failure(component, "ArrowFixedSizeListArray", array.data_type());
400        return Either::Left(std::iter::empty());
401    };
402
403    let Some(values) = fixed_size_list_array
404        .values()
405        .downcast_array_ref::<ArrowPrimitiveArray<P>>()
406    else {
407        error_on_downcast_failure(
408            component,
409            "ArrowPrimitiveArray<P>",
410            fixed_size_list_array.data_type(),
411        );
412        return Either::Left(std::iter::empty());
413    };
414
415    let size = fixed_size_list_array.value_length() as usize;
416    let values = values.values().as_ref();
417
418    // NOTE: No need for validity checks here, `component_spans` already takes care of that.
419    Either::Right(
420        component_spans.map(move |span| bytemuck::cast_slice(&values[(span * size).range()])),
421    )
422}
423
424// We use a macro instead of a blanket impl because this violates orphan rules.
425macro_rules! impl_array_native_type {
426    ($arrow_primitive_type:ty, $native_type:ty) => {
427        impl<const N: usize> ChunkComponentSlicer for [$native_type; N]
428        where
429            [$native_type; N]: bytemuck::Pod,
430        {
431            type Item<'a> = &'a [[$native_type; N]];
432
433            fn slice<'a>(
434                component: ComponentIdentifier,
435                array: &'a dyn ArrowArray,
436                component_spans: impl Iterator<Item = Span<usize>> + 'a,
437            ) -> impl Iterator<Item = Self::Item<'a>> {
438                slice_as_array_native::<N, $arrow_primitive_type, $native_type>(
439                    component,
440                    array,
441                    component_spans,
442                )
443            }
444        }
445    };
446}
447
448impl_array_native_type!(arrow::array::types::UInt8Type, u8);
449impl_array_native_type!(arrow::array::types::UInt16Type, u16);
450impl_array_native_type!(arrow::array::types::UInt32Type, u32);
451impl_array_native_type!(arrow::array::types::UInt64Type, u64);
452// impl_array_native_type!(arrow::array::types::UInt128Type, u128);
453impl_array_native_type!(arrow::array::types::Int8Type, i8);
454impl_array_native_type!(arrow::array::types::Int16Type, i16);
455impl_array_native_type!(arrow::array::types::Int32Type, i32);
456impl_array_native_type!(arrow::array::types::Int64Type, i64);
457// impl_array_native_type!(arrow::array::types::Int128Type, i128);
458impl_array_native_type!(arrow::array::types::Float16Type, half::f16);
459impl_array_native_type!(arrow::array::types::Float32Type, f32);
460impl_array_native_type!(arrow::array::types::Float64Type, f64);
461
462/// The actual implementation of `impl_buffer_native_type!`, so that we don't have to work in a macro.
463fn slice_as_buffer_native<'a, P, T>(
464    component: ComponentIdentifier,
465    array: &'a dyn ArrowArray,
466    component_spans: impl Iterator<Item = Span<usize>> + 'a,
467) -> impl Iterator<Item = Vec<ArrowScalarBuffer<T>>> + 'a
468where
469    P: ArrowPrimitiveType<Native = T>,
470    T: ArrowNativeType,
471{
472    let Some(inner_list_array) = array.downcast_array_ref::<ArrowListArray>() else {
473        error_on_downcast_failure(component, "ArrowListArray", array.data_type());
474        return Either::Left(std::iter::empty());
475    };
476
477    let Some(values) = inner_list_array
478        .values()
479        .downcast_array_ref::<ArrowPrimitiveArray<P>>()
480    else {
481        error_on_downcast_failure(
482            component,
483            "ArrowPrimitiveArray<P>",
484            inner_list_array.data_type(),
485        );
486        return Either::Left(std::iter::empty());
487    };
488
489    let values = values.values();
490    let offsets = inner_list_array.offsets();
491    let lengths = offsets.lengths().collect_vec();
492
493    // NOTE: No need for validity checks here, `component_spans` already takes care of that.
494    Either::Right(component_spans.map(move |span| {
495        let offsets = &offsets[span.range()];
496        let lengths = &lengths[span.range()];
497        izip!(offsets, lengths)
498            // NOTE: Not an actual clone, just a refbump of the underlying buffer.
499            .map(|(&idx, &len)| values.clone().slice(idx as _, len))
500            .collect_vec()
501    }))
502}
503
504// We special case `&[u8]` so that it works both for `List[u8]` and `Binary/LargeBinary` arrays.
505fn slice_as_u8<'a>(
506    component: ComponentIdentifier,
507    array: &'a dyn ArrowArray,
508    component_spans: impl Iterator<Item = Span<usize>> + 'a,
509) -> impl Iterator<Item = Vec<Buffer>> + 'a {
510    if let Some(binary_array) = array.downcast_array_ref::<BinaryArray>() {
511        let values = binary_array.values();
512        let offsets = binary_array.offsets();
513        let lengths = offsets.lengths().collect_vec();
514
515        // NOTE: No need for validity checks here, `component_spans` already takes care of that.
516        Either::Left(Either::Left(component_spans.map(move |span| {
517            let offsets = &offsets[span.range()];
518            let lengths = &lengths[span.range()];
519            izip!(offsets, lengths)
520                // NOTE: Not an actual clone, just a refbump of the underlying buffer.
521                .map(|(&idx, &len)| values.clone().slice_with_length(idx as _, len))
522                .collect_vec()
523        })))
524    } else if let Some(binary_array) = array.downcast_array_ref::<LargeBinaryArray>() {
525        let values = binary_array.values();
526        let offsets = binary_array.offsets();
527        let lengths = offsets.lengths().collect_vec();
528
529        // NOTE: No need for validity checks here, `component_spans` already takes care of that.
530        Either::Left(Either::Right(component_spans.map(move |span| {
531            let offsets = &offsets[span.range()];
532            let lengths = &lengths[span.range()];
533            izip!(offsets, lengths)
534                // NOTE: Not an actual clone, just a refbump of the underlying buffer.
535                .map(|(&idx, &len)| values.clone().slice_with_length(idx as _, len))
536                .collect_vec()
537        })))
538    } else {
539        Either::Right(
540            slice_as_buffer_native::<arrow::array::types::UInt8Type, u8>(
541                component,
542                array,
543                component_spans,
544            )
545            .map(|scalar_buffers| {
546                scalar_buffers
547                    .into_iter()
548                    .map(|scalar_buffer| scalar_buffer.into_inner())
549                    .collect_vec()
550            }),
551        )
552    }
553}
554
555// We use a macro instead of a blanket impl because this violates orphan rules.
556macro_rules! impl_buffer_native_type {
557    ($primitive_type:ty, $native_type:ty) => {
558        impl ChunkComponentSlicer for &[$native_type] {
559            type Item<'a> = Vec<ArrowScalarBuffer<$native_type>>;
560
561            fn slice<'a>(
562                component: ComponentIdentifier,
563                array: &'a dyn ArrowArray,
564                component_spans: impl Iterator<Item = Span<usize>> + 'a,
565            ) -> impl Iterator<Item = Self::Item<'a>> {
566                slice_as_buffer_native::<$primitive_type, $native_type>(
567                    component,
568                    array,
569                    component_spans,
570                )
571            }
572        }
573    };
574}
575
576// We special case `&[u8]` so that it works both for `List[u8]` and `Binary` arrays.
577impl ChunkComponentSlicer for &[u8] {
578    type Item<'a> = Vec<Buffer>;
579
580    fn slice<'a>(
581        component: ComponentIdentifier,
582        array: &'a dyn ArrowArray,
583        component_spans: impl Iterator<Item = Span<usize>> + 'a,
584    ) -> impl Iterator<Item = Self::Item<'a>> {
585        slice_as_u8(component, array, component_spans)
586    }
587}
588
589impl_buffer_native_type!(arrow::array::types::UInt16Type, u16);
590impl_buffer_native_type!(arrow::array::types::UInt32Type, u32);
591impl_buffer_native_type!(arrow::array::types::UInt64Type, u64);
592// impl_buffer_native_type!(arrow::array::types::UInt128Type, u128);
593impl_buffer_native_type!(arrow::array::types::Int8Type, i8);
594impl_buffer_native_type!(arrow::array::types::Int16Type, i16);
595impl_buffer_native_type!(arrow::array::types::Int32Type, i32);
596impl_buffer_native_type!(arrow::array::types::Int64Type, i64);
597// impl_buffer_native_type!(arrow::array::types::Int128Type, i128);
598impl_buffer_native_type!(arrow::array::types::Float16Type, half::f16);
599impl_buffer_native_type!(arrow::array::types::Float32Type, f32);
600impl_buffer_native_type!(arrow::array::types::Float64Type, f64);
601
602/// The actual implementation of `impl_array_list_native_type!`, so that we don't have to work in a macro.
603fn slice_as_array_list_native<'a, const N: usize, P, T>(
604    component: ComponentIdentifier,
605    array: &'a dyn ArrowArray,
606    component_spans: impl Iterator<Item = Span<usize>> + 'a,
607) -> impl Iterator<Item = Vec<&'a [[T; N]]>> + 'a
608where
609    [T; N]: bytemuck::Pod,
610    P: ArrowPrimitiveType<Native = T>,
611    T: ArrowNativeType + bytemuck::Pod,
612{
613    let Some(inner_list_array) = array.downcast_array_ref::<ArrowListArray>() else {
614        error_on_downcast_failure(component, "ArrowListArray", array.data_type());
615        return Either::Left(std::iter::empty());
616    };
617
618    let inner_offsets = inner_list_array.offsets();
619    let inner_lengths = inner_offsets.lengths().collect_vec();
620
621    let Some(fixed_size_list_array) = inner_list_array
622        .values()
623        .downcast_array_ref::<ArrowFixedSizeListArray>()
624    else {
625        error_on_downcast_failure(
626            component,
627            "ArrowFixedSizeListArray",
628            inner_list_array.data_type(),
629        );
630        return Either::Left(std::iter::empty());
631    };
632
633    let Some(values) = fixed_size_list_array
634        .values()
635        .downcast_array_ref::<ArrowPrimitiveArray<P>>()
636    else {
637        error_on_downcast_failure(
638            component,
639            "ArrowPrimitiveArray<P>",
640            fixed_size_list_array.data_type(),
641        );
642        return Either::Left(std::iter::empty());
643    };
644
645    let size = fixed_size_list_array.value_length() as usize;
646    let values = values.values();
647
648    // NOTE: No need for validity checks here, `iter_offsets` already takes care of that.
649    Either::Right(component_spans.map(move |span| {
650        let inner_offsets = &inner_offsets[span.range()];
651        let inner_lengths = &inner_lengths[span.range()];
652        izip!(inner_offsets, inner_lengths)
653            .map(|(&idx, &len)| {
654                let idx = idx as usize;
655                bytemuck::cast_slice(&values[idx * size..idx * size + len * size])
656            })
657            .collect_vec()
658    }))
659}
660
661// We use a macro instead of a blanket impl because this violates orphan rules.
662macro_rules! impl_array_list_native_type {
663    ($primitive_type:ty, $native_type:ty) => {
664        impl<const N: usize> ChunkComponentSlicer for &[[$native_type; N]]
665        where
666            [$native_type; N]: bytemuck::Pod,
667        {
668            type Item<'a> = Vec<&'a [[$native_type; N]]>;
669
670            fn slice<'a>(
671                component: ComponentIdentifier,
672                array: &'a dyn ArrowArray,
673                component_spans: impl Iterator<Item = Span<usize>> + 'a,
674            ) -> impl Iterator<Item = Self::Item<'a>> {
675                slice_as_array_list_native::<N, $primitive_type, $native_type>(
676                    component,
677                    array,
678                    component_spans,
679                )
680            }
681        }
682    };
683}
684
685impl_array_list_native_type!(arrow::array::types::UInt8Type, u8);
686impl_array_list_native_type!(arrow::array::types::UInt16Type, u16);
687impl_array_list_native_type!(arrow::array::types::UInt32Type, u32);
688impl_array_list_native_type!(arrow::array::types::UInt64Type, u64);
689// impl_array_list_native_type!(arrow::array::types::UInt128Type, u128);
690impl_array_list_native_type!(arrow::array::types::Int8Type, i8);
691impl_array_list_native_type!(arrow::array::types::Int16Type, i16);
692impl_array_list_native_type!(arrow::array::types::Int32Type, i32);
693impl_array_list_native_type!(arrow::array::types::Int64Type, i64);
694// impl_array_list_native_type!(arrow::array::types::Int128Type, i128);
695impl_array_list_native_type!(arrow::array::types::Float16Type, half::f16);
696impl_array_list_native_type!(arrow::array::types::Float32Type, f32);
697impl_array_list_native_type!(arrow::array::types::Float64Type, f64);
698
699impl ChunkComponentSlicer for String {
700    type Item<'a> = Vec<ArrowString>;
701
702    fn slice<'a>(
703        component: ComponentIdentifier,
704        array: &'a dyn ArrowArray,
705        component_spans: impl Iterator<Item = Span<usize>> + 'a,
706    ) -> impl Iterator<Item = Vec<ArrowString>> {
707        let Some(utf8_array) = array.downcast_array_ref::<ArrowStringArray>() else {
708            error_on_downcast_failure(component, "ArrowStringArray", array.data_type());
709            return Either::Left(std::iter::empty());
710        };
711
712        let values = utf8_array.values().clone();
713        let offsets = utf8_array.offsets().clone();
714        let lengths = offsets.lengths().collect_vec();
715
716        // NOTE: No need for validity checks here, `component_spans` already takes care of that.
717        Either::Right(component_spans.map(move |range| {
718            let offsets = &offsets[range.range()];
719            let lengths = &lengths[range.range()];
720            izip!(offsets, lengths)
721                .map(|(&idx, &len)| ArrowString::from(values.slice_with_length(idx as _, len)))
722                .collect_vec()
723        }))
724    }
725}
726
727impl ChunkComponentSlicer for bool {
728    type Item<'a> = ArrowBooleanBuffer;
729
730    fn slice<'a>(
731        component: ComponentIdentifier,
732        array: &'a dyn ArrowArray,
733        component_spans: impl Iterator<Item = Span<usize>> + 'a,
734    ) -> impl Iterator<Item = Self::Item<'a>> {
735        let Some(values) = array.downcast_array_ref::<ArrowBooleanArray>() else {
736            error_on_downcast_failure(component, "ArrowBooleanArray", array.data_type());
737            return Either::Left(std::iter::empty());
738        };
739        let values = values.values().clone();
740
741        // NOTE: No need for validity checks here, `component_spans` already takes care of that.
742        Either::Right(
743            component_spans.map(move |Span { start, len }| values.clone().slice(start, len)),
744        )
745    }
746}
747
748// ---
749
750pub struct ChunkIndicesIter {
751    chunk: Arc<Chunk>,
752
753    time_column: Option<TimeColumn>,
754    index: usize,
755}
756
757impl Iterator for ChunkIndicesIter {
758    type Item = (TimeInt, RowId);
759
760    fn next(&mut self) -> Option<Self::Item> {
761        let i = self.index;
762        self.index += 1;
763
764        let row_id = *self.chunk.row_ids_slice().get(i)?;
765
766        if let Some(time_column) = &self.time_column {
767            let time = *time_column.times_raw().get(i)?;
768            let time = TimeInt::new_temporal(time);
769            Some((time, row_id))
770        } else {
771            Some((TimeInt::STATIC, row_id))
772        }
773    }
774}
775
776impl Chunk {
777    /// Returns an iterator over the indices (`(TimeInt, RowId)`) of a [`Chunk`], for a given timeline.
778    ///
779    /// If the chunk is static, `timeline` will be ignored.
780    ///
781    /// The returned iterator outlives `self`, thus it can be passed around freely.
782    /// The tradeoff is that `self` must be an `Arc`.
783    ///
784    /// See also [`Self::iter_indices`].
785    #[inline]
786    pub fn iter_indices_owned(
787        self: Arc<Self>,
788        timeline: &TimelineName,
789    ) -> impl Iterator<Item = (TimeInt, RowId)> + use<> {
790        if self.is_static() {
791            Either::Left(ChunkIndicesIter {
792                chunk: self,
793                time_column: None,
794                index: 0,
795            })
796        } else {
797            self.timelines.get(timeline).cloned().map_or_else(
798                || Either::Right(Either::Left(std::iter::empty())),
799                |time_column| {
800                    Either::Right(Either::Right(ChunkIndicesIter {
801                        chunk: self,
802                        time_column: Some(time_column),
803                        index: 0,
804                    }))
805                },
806            )
807        }
808    }
809}
810
811// ---
812
813/// The actual iterator implementation for [`Chunk::iter_component`].
814pub struct ChunkComponentIter<C, IO> {
815    values: Arc<Vec<C>>,
816    offsets: IO,
817}
818
819/// The underlying item type for [`ChunkComponentIter`].
820///
821/// This allows us to cheaply carry slices of deserialized data, while working around the
822/// limitations of Rust's Iterator trait and ecosystem.
823///
824/// See [`ChunkComponentIterItem::as_slice`].
825#[derive(Clone, PartialEq)]
826pub struct ChunkComponentIterItem<C> {
827    values: Arc<Vec<C>>,
828    span: Span<usize>,
829}
830
831impl<C: PartialEq> PartialEq<[C]> for ChunkComponentIterItem<C> {
832    fn eq(&self, rhs: &[C]) -> bool {
833        self.as_slice().eq(rhs)
834    }
835}
836
837impl<C: PartialEq> PartialEq<Vec<C>> for ChunkComponentIterItem<C> {
838    fn eq(&self, rhs: &Vec<C>) -> bool {
839        self.as_slice().eq(rhs)
840    }
841}
842
843impl<C: Eq> Eq for ChunkComponentIterItem<C> {}
844
845// NOTE: No `C: Default`!
846impl<C> Default for ChunkComponentIterItem<C> {
847    #[inline]
848    fn default() -> Self {
849        Self {
850            values: Arc::new(Vec::new()),
851            span: Span::default(),
852        }
853    }
854}
855
856impl<C> ChunkComponentIterItem<C> {
857    #[inline]
858    pub fn as_slice(&self) -> &[C] {
859        &self.values[self.span.range()]
860    }
861}
862
863impl<C> std::ops::Deref for ChunkComponentIterItem<C> {
864    type Target = [C];
865
866    #[inline]
867    fn deref(&self) -> &Self::Target {
868        self.as_slice()
869    }
870}
871
872impl<C: Component, IO: Iterator<Item = Span<usize>>> Iterator for ChunkComponentIter<C, IO> {
873    type Item = ChunkComponentIterItem<C>;
874
875    #[inline]
876    fn next(&mut self) -> Option<Self::Item> {
877        self.offsets.next().map(move |span| ChunkComponentIterItem {
878            values: Arc::clone(&self.values),
879            span,
880        })
881    }
882}
883
884impl Chunk {
885    /// Returns an iterator over the deserialized batches of a [`Chunk`], for a given component.
886    ///
887    /// This is a dedicated fast path: the entire column will be downcasted and deserialized at
888    /// once, and then every component batch will be a slice reference into that global slice.
889    /// Use this when working with complex arrow datatypes and performance matters (e.g. ranging
890    /// through enum types across many timestamps).
891    ///
892    /// TODO(#5305): Note that, while this is much faster than deserializing each row individually,
893    /// this still uses the old codegen'd deserialization path, which does some very unidiomatic Arrow
894    /// things, and is therefore very slow at the moment. Avoid this on performance critical paths.
895    ///
896    /// See also:
897    /// * [`Self::iter_slices`]
898    /// * [`Self::iter_slices_from_struct_field`]
899    #[inline]
900    pub fn iter_component<C: Component>(
901        &self,
902        component: ComponentIdentifier,
903    ) -> ChunkComponentIter<C, impl Iterator<Item = Span<usize>> + '_ + use<'_, C>> {
904        let Some(list_array) = self.components.get_array(component) else {
905            return ChunkComponentIter {
906                values: Arc::new(vec![]),
907                offsets: Either::Left(std::iter::empty()),
908            };
909        };
910
911        let values = arrow::array::ArrayRef::from(list_array.values().clone());
912        let values = match C::from_arrow(&values) {
913            Ok(values) => values,
914            Err(err) => {
915                re_log::debug_panic!(
916                    "deserialization failed for {}, data discarded: {}",
917                    C::name(),
918                    re_error::format_ref(&err),
919                );
920
921                re_log::error_once!(
922                    "deserialization failed for {}, data discarded: {}",
923                    C::name(),
924                    re_error::format_ref(&err),
925                );
926
927                return ChunkComponentIter {
928                    values: Arc::new(vec![]),
929                    offsets: Either::Left(std::iter::empty()),
930                };
931            }
932        };
933
934        // NOTE: No need for validity checks here, `iter_offsets` already takes care of that.
935        ChunkComponentIter {
936            values: Arc::new(values),
937            offsets: Either::Right(self.iter_component_offsets(component)),
938        }
939    }
940}
941
942// ---
943
944#[cfg(test)]
945mod tests {
946    use std::sync::Arc;
947
948    use itertools::{Itertools as _, izip};
949    use re_log_types::example_components::{MyPoint, MyPoints};
950    use re_log_types::{EntityPath, TimeInt, TimePoint};
951
952    use crate::{Chunk, RowId, Timeline};
953
954    #[test]
955    fn iter_indices_temporal() -> anyhow::Result<()> {
956        let entity_path = EntityPath::from("this/that");
957
958        let row_id1 = RowId::new();
959        let row_id2 = RowId::new();
960        let row_id3 = RowId::new();
961        let row_id4 = RowId::new();
962        let row_id5 = RowId::new();
963
964        let timeline_frame = Timeline::new_sequence("frame");
965
966        let timepoint1 = [(timeline_frame, 1)];
967        let timepoint2 = [(timeline_frame, 3)];
968        let timepoint3 = [(timeline_frame, 5)];
969        let timepoint4 = [(timeline_frame, 7)];
970        let timepoint5 = [(timeline_frame, 9)];
971
972        let points1 = &[MyPoint::new(1.0, 1.0)];
973        let points2 = &[MyPoint::new(2.0, 2.0)];
974        let points3 = &[MyPoint::new(3.0, 3.0)];
975        let points4 = &[MyPoint::new(4.0, 4.0)];
976        let points5 = &[MyPoint::new(5.0, 5.0)];
977
978        let chunk = Arc::new(
979            Chunk::builder(entity_path.clone())
980                .with_component_batches(
981                    row_id1,
982                    timepoint1,
983                    [(MyPoints::descriptor_points(), points1 as _)],
984                )
985                .with_component_batches(
986                    row_id2,
987                    timepoint2,
988                    [(MyPoints::descriptor_points(), points2 as _)],
989                )
990                .with_component_batches(
991                    row_id3,
992                    timepoint3,
993                    [(MyPoints::descriptor_points(), points3 as _)],
994                )
995                .with_component_batches(
996                    row_id4,
997                    timepoint4,
998                    [(MyPoints::descriptor_points(), points4 as _)],
999                )
1000                .with_component_batches(
1001                    row_id5,
1002                    timepoint5,
1003                    [(MyPoints::descriptor_points(), points5 as _)],
1004                )
1005                .build()?,
1006        );
1007
1008        {
1009            let got = Arc::clone(&chunk)
1010                .iter_indices_owned(timeline_frame.name())
1011                .collect_vec();
1012            let expected = izip!(
1013                chunk
1014                    .timelines
1015                    .get(timeline_frame.name())
1016                    .map(|time_column| time_column.times().collect_vec())
1017                    .unwrap_or_default(),
1018                chunk.row_ids()
1019            )
1020            .collect_vec();
1021
1022            similar_asserts::assert_eq!(expected, got);
1023        }
1024
1025        Ok(())
1026    }
1027
1028    #[test]
1029    fn iter_indices_static() -> anyhow::Result<()> {
1030        let entity_path = EntityPath::from("this/that");
1031
1032        let row_id1 = RowId::new();
1033        let row_id2 = RowId::new();
1034        let row_id3 = RowId::new();
1035        let row_id4 = RowId::new();
1036        let row_id5 = RowId::new();
1037
1038        let timeline_frame = Timeline::new_sequence("frame");
1039
1040        let points1 = &[MyPoint::new(1.0, 1.0)];
1041        let points2 = &[MyPoint::new(2.0, 2.0)];
1042        let points3 = &[MyPoint::new(3.0, 3.0)];
1043        let points4 = &[MyPoint::new(4.0, 4.0)];
1044        let points5 = &[MyPoint::new(5.0, 5.0)];
1045
1046        let chunk = Arc::new(
1047            Chunk::builder(entity_path.clone())
1048                .with_component_batches(
1049                    row_id1,
1050                    TimePoint::default(),
1051                    [(MyPoints::descriptor_points(), points1 as _)],
1052                )
1053                .with_component_batches(
1054                    row_id2,
1055                    TimePoint::default(),
1056                    [(MyPoints::descriptor_points(), points2 as _)],
1057                )
1058                .with_component_batches(
1059                    row_id3,
1060                    TimePoint::default(),
1061                    [(MyPoints::descriptor_points(), points3 as _)],
1062                )
1063                .with_component_batches(
1064                    row_id4,
1065                    TimePoint::default(),
1066                    [(MyPoints::descriptor_points(), points4 as _)],
1067                )
1068                .with_component_batches(
1069                    row_id5,
1070                    TimePoint::default(),
1071                    [(MyPoints::descriptor_points(), points5 as _)],
1072                )
1073                .build()?,
1074        );
1075
1076        {
1077            let got = Arc::clone(&chunk)
1078                .iter_indices_owned(timeline_frame.name())
1079                .collect_vec();
1080            let expected = izip!(std::iter::repeat(TimeInt::STATIC), chunk.row_ids()).collect_vec();
1081
1082            similar_asserts::assert_eq!(expected, got);
1083        }
1084
1085        Ok(())
1086    }
1087}