re_chunk/
iter.rs

1use std::sync::Arc;
2
3use arrow::{
4    array::{
5        Array as ArrowArray, ArrayRef as ArrowArrayRef, ArrowPrimitiveType, BinaryArray,
6        BooleanArray as ArrowBooleanArray, FixedSizeListArray as ArrowFixedSizeListArray,
7        LargeBinaryArray, ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray,
8        StringArray as ArrowStringArray, StructArray as ArrowStructArray,
9    },
10    buffer::{BooleanBuffer as ArrowBooleanBuffer, Buffer, ScalarBuffer as ArrowScalarBuffer},
11    datatypes::ArrowNativeType,
12};
13use itertools::{Either, Itertools as _, izip};
14
15use re_arrow_util::ArrowArrayDowncastRef as _;
16use re_log_types::{TimeInt, TimePoint, TimelineName};
17use re_span::Span;
18use re_types_core::{ArrowString, Component, ComponentDescriptor};
19
20use crate::{Chunk, RowId, TimeColumn};
21
22// ---
23
24// NOTE: Regarding the use of (recursive) `Either` in this file: it is _not_ arbitrary.
25//
26// They _should_ all follow this model:
27// * The first layer is always the emptiness layer: `Left` is empty, `Right` is non-empty.
28// * The second layer is the temporarily layer: `Left` is static, `Right` is temporal.
29// * Any layers beyond that follow the same pattern: `Left` doesn't have something, while `Right` does.
30
31impl Chunk {
32    /// Return the raw component list array values for a given component.
33    ///
34    /// Use with great care: Component data may have arbitrary gaps.
35    pub fn raw_component_array(
36        &self,
37        component_descr: &ComponentDescriptor,
38    ) -> Option<&ArrowArrayRef> {
39        self.components
40            .get(component_descr)
41            .map(|list_array| list_array.values())
42    }
43
44    /// Returns an iterator over the indices (`(TimeInt, RowId)`) of a [`Chunk`], for a given timeline.
45    ///
46    /// If the chunk is static, `timeline` will be ignored.
47    ///
48    /// See also:
49    /// * [`Self::iter_component_indices`].
50    /// * [`Self::iter_indices_owned`].
51    #[inline]
52    pub fn iter_indices(
53        &self,
54        timeline: &TimelineName,
55    ) -> impl Iterator<Item = (TimeInt, RowId)> + '_ + use<'_> {
56        if self.is_static() {
57            Either::Right(Either::Left(izip!(
58                std::iter::repeat(TimeInt::STATIC),
59                self.row_ids()
60            )))
61        } else {
62            let Some(time_column) = self.timelines.get(timeline) else {
63                return Either::Left(std::iter::empty());
64            };
65
66            Either::Right(Either::Right(izip!(time_column.times(), self.row_ids())))
67        }
68    }
69
70    /// Returns an iterator over the indices (`(TimeInt, RowId)`) of a [`Chunk`], for a given
71    /// timeline and component.
72    ///
73    /// If the chunk is static, `timeline` will be ignored.
74    ///
75    /// This is different than [`Self::iter_indices`] in that it will only yield indices for rows
76    /// at which there is data for the specified component.
77    ///
78    /// See also [`Self::iter_indices`].
79    pub fn iter_component_indices(
80        &self,
81        timeline: &TimelineName,
82        component_descr: &ComponentDescriptor,
83    ) -> impl Iterator<Item = (TimeInt, RowId)> + '_ + use<'_> {
84        let Some(list_array) = self.components.get(component_descr) else {
85            return Either::Left(std::iter::empty());
86        };
87
88        if self.is_static() {
89            let indices = izip!(std::iter::repeat(TimeInt::STATIC), self.row_ids());
90
91            if let Some(validity) = list_array.nulls() {
92                Either::Right(Either::Left(Either::Left(
93                    indices
94                        .enumerate()
95                        .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
96                )))
97            } else {
98                Either::Right(Either::Left(Either::Right(indices)))
99            }
100        } else {
101            let Some(time_column) = self.timelines.get(timeline) else {
102                return Either::Left(std::iter::empty());
103            };
104
105            let indices = izip!(time_column.times(), self.row_ids());
106
107            if let Some(validity) = list_array.nulls() {
108                Either::Right(Either::Right(Either::Left(
109                    indices
110                        .enumerate()
111                        .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
112                )))
113            } else {
114                Either::Right(Either::Right(Either::Right(indices)))
115            }
116        }
117    }
118
119    /// Returns an iterator over the [`TimePoint`]s of a [`Chunk`].
120    ///
121    /// See also:
122    /// * [`Self::iter_component_timepoints`].
123    #[inline]
124    pub fn iter_timepoints(&self) -> impl Iterator<Item = TimePoint> + '_ {
125        let mut timelines = self
126            .timelines
127            .values()
128            .map(|time_column| (time_column.timeline, time_column.times()))
129            .collect_vec();
130
131        std::iter::from_fn(move || {
132            let mut timepoint = TimePoint::default();
133            for (timeline, times) in &mut timelines {
134                timepoint.insert(*timeline, times.next()?);
135            }
136            Some(timepoint)
137        })
138    }
139
140    /// Returns an iterator over the [`TimePoint`]s of a [`Chunk`], for a given component.
141    ///
142    /// This is different than [`Self::iter_timepoints`] in that it will only yield timepoints for rows
143    /// at which there is data for the specified component.
144    ///
145    /// See also [`Self::iter_timepoints`].
146    pub fn iter_component_timepoints(
147        &self,
148        component_descr: &ComponentDescriptor,
149    ) -> impl Iterator<Item = TimePoint> + '_ + use<'_> {
150        let Some(list_array) = self.components.get(component_descr) else {
151            return Either::Left(std::iter::empty());
152        };
153
154        if let Some(validity) = list_array.nulls() {
155            let mut timelines = self
156                .timelines
157                .values()
158                .map(|time_column| {
159                    (
160                        time_column.timeline,
161                        time_column
162                            .times()
163                            .enumerate()
164                            .filter(|(i, _)| validity.is_valid(*i))
165                            .map(|(_, time)| time),
166                    )
167                })
168                .collect_vec();
169
170            Either::Right(Either::Left(std::iter::from_fn(move || {
171                let mut timepoint = TimePoint::default();
172                for (timeline, times) in &mut timelines {
173                    timepoint.insert(*timeline, times.next()?);
174                }
175                Some(timepoint)
176            })))
177        } else {
178            let mut timelines = self
179                .timelines
180                .values()
181                .map(|time_column| (time_column.timeline, time_column.times()))
182                .collect_vec();
183
184            Either::Right(Either::Right(std::iter::from_fn(move || {
185                let mut timepoint = TimePoint::default();
186                for (timeline, times) in &mut timelines {
187                    timepoint.insert(*timeline, times.next()?);
188                }
189                Some(timepoint)
190            })))
191        }
192    }
193
194    /// Returns an iterator over the offsets (`(offset, len)`) of a [`Chunk`], for a given
195    /// component.
196    ///
197    /// I.e. each `(offset, len)` pair describes the position of a component batch in the
198    /// underlying arrow array of values.
199    pub fn iter_component_offsets<'a>(
200        &'a self,
201        component_descriptor: &ComponentDescriptor,
202    ) -> impl Iterator<Item = Span<usize>> + 'a + use<'a> {
203        let Some(list_array) = self.components.get(component_descriptor) else {
204            return Either::Left(std::iter::empty());
205        };
206
207        let offsets = list_array.offsets().iter().map(|idx| *idx as usize);
208        let lengths = list_array.offsets().lengths();
209
210        if let Some(validity) = list_array.nulls() {
211            Either::Right(Either::Left(
212                izip!(offsets, lengths)
213                    .enumerate()
214                    .filter_map(|(i, o)| validity.is_valid(i).then_some(o))
215                    .map(|(start, len)| Span { start, len }),
216            ))
217        } else {
218            Either::Right(Either::Right(
219                izip!(offsets, lengths).map(|(start, len)| Span { start, len }),
220            ))
221        }
222    }
223
224    /// Returns an iterator over the all the sliced component batches in a [`Chunk`]'s column, for
225    /// a given component.
226    ///
227    /// The generic `S` parameter will decide the type of data returned. It is _very_ permissive.
228    /// See [`ChunkComponentSlicer`] for all the available implementations.
229    ///
230    /// This is a very fast path: the entire column will be downcasted at once, and then every
231    /// component batch will be a slice reference into that global slice.
232    ///
233    /// See also [`Self::iter_slices_from_struct_field`].
234    #[inline]
235    pub fn iter_slices<'a, S: 'a + ChunkComponentSlicer>(
236        &'a self,
237        component_descriptor: ComponentDescriptor,
238    ) -> impl Iterator<Item = S::Item<'a>> + 'a + use<'a, S> {
239        let Some(list_array) = self.components.get(&component_descriptor) else {
240            return Either::Left(std::iter::empty());
241        };
242
243        let component_offset_values = self.iter_component_offsets(&component_descriptor);
244
245        Either::Right(S::slice(
246            component_descriptor,
247            &**list_array.values() as _,
248            component_offset_values,
249        ))
250    }
251
252    /// Returns an iterator over the all the sliced component batches in a [`Chunk`]'s column, for
253    /// a specific struct field of given component.
254    ///
255    /// The target component must be a `StructArray`.
256    ///
257    /// The generic `S` parameter will decide the type of data returned. It is _very_ permissive.
258    /// See [`ChunkComponentSlicer`] for all the available implementations.
259    ///
260    /// This is a very fast path: the entire column will be downcasted at once, and then every
261    /// component batch will be a slice reference into that global slice.
262    ///
263    /// See also [`Self::iter_slices_from_struct_field`].
264    pub fn iter_slices_from_struct_field<'a, S: 'a + ChunkComponentSlicer>(
265        &'a self,
266        component_descriptor: ComponentDescriptor,
267        field_name: &'a str,
268    ) -> impl Iterator<Item = S::Item<'a>> + 'a {
269        let Some(list_array) = self.components.get(&component_descriptor) else {
270            return Either::Left(std::iter::empty());
271        };
272
273        let Some(struct_array) = list_array.values().downcast_array_ref::<ArrowStructArray>()
274        else {
275            if cfg!(debug_assertions) {
276                panic!("downcast failed for {component_descriptor}, data discarded");
277            } else {
278                re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
279            }
280            return Either::Left(std::iter::empty());
281        };
282
283        let Some(field_idx) = struct_array
284            .fields()
285            .iter()
286            .enumerate()
287            .find_map(|(i, field)| (field.name() == field_name).then_some(i))
288        else {
289            if cfg!(debug_assertions) {
290                panic!("field {field_name} not found for {component_descriptor}, data discarded");
291            } else {
292                re_log::error_once!(
293                    "field {field_name} not found for {component_descriptor}, data discarded"
294                );
295            }
296            return Either::Left(std::iter::empty());
297        };
298
299        if field_idx >= struct_array.num_columns() {
300            if cfg!(debug_assertions) {
301                panic!("field {field_name} not found for {component_descriptor}, data discarded");
302            } else {
303                re_log::error_once!(
304                    "field {field_name} not found for {component_descriptor}, data discarded"
305                );
306                return Either::Left(std::iter::empty());
307            }
308        }
309
310        let component_offset_values = self.iter_component_offsets(&component_descriptor);
311
312        Either::Right(S::slice(
313            component_descriptor,
314            struct_array.column(field_idx),
315            component_offset_values,
316        ))
317    }
318}
319
320// ---
321
322/// A `ChunkComponentSlicer` knows how to efficiently slice component batches out of a Chunk column.
323///
324/// See [`Chunk::iter_slices`] and [`Chunk::iter_slices_from_struct_field`].
325pub trait ChunkComponentSlicer {
326    type Item<'a>;
327
328    fn slice<'a>(
329        // TODO(#10460): A reference to component descriptor should be enough since the returned iterator doesn't depend on it being alive.
330        // However, I wasn't able to get this idea across to the borrow checker.
331        component_descriptor: ComponentDescriptor,
332        array: &'a dyn ArrowArray,
333        component_spans: impl Iterator<Item = Span<usize>> + 'a,
334    ) -> impl Iterator<Item = Self::Item<'a>> + 'a;
335}
336
337/// The actual implementation of `impl_native_type!`, so that we don't have to work in a macro.
338#[expect(clippy::needless_pass_by_value)] // The simplest way to avoid lifetime issues.
339fn slice_as_native<'a, P, T>(
340    component_descriptor: ComponentDescriptor,
341    array: &'a dyn ArrowArray,
342    component_spans: impl Iterator<Item = Span<usize>> + 'a,
343) -> impl Iterator<Item = &'a [T]> + 'a
344where
345    P: ArrowPrimitiveType<Native = T>,
346    T: ArrowNativeType,
347{
348    let Some(values) = array.downcast_array_ref::<ArrowPrimitiveArray<P>>() else {
349        if cfg!(debug_assertions) {
350            panic!("downcast failed for {component_descriptor}, data discarded");
351        } else {
352            re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
353        }
354        return Either::Left(std::iter::empty());
355    };
356    let values = values.values().as_ref();
357
358    // NOTE: No need for validity checks here, `iter_offsets` already takes care of that.
359    Either::Right(component_spans.map(move |range| &values[range.range()]))
360}
361
362// We use a macro instead of a blanket impl because this violates orphan rules.
363macro_rules! impl_native_type {
364    ($arrow_primitive_type:ty, $native_type:ty) => {
365        impl ChunkComponentSlicer for $native_type {
366            type Item<'a> = &'a [$native_type];
367
368            fn slice<'a>(
369                component_descriptor: ComponentDescriptor,
370                array: &'a dyn ArrowArray,
371                component_spans: impl Iterator<Item = Span<usize>> + 'a,
372            ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
373                slice_as_native::<$arrow_primitive_type, $native_type>(
374                    component_descriptor,
375                    array,
376                    component_spans,
377                )
378            }
379        }
380    };
381}
382
383impl_native_type!(arrow::array::types::UInt8Type, u8);
384impl_native_type!(arrow::array::types::UInt16Type, u16);
385impl_native_type!(arrow::array::types::UInt32Type, u32);
386impl_native_type!(arrow::array::types::UInt64Type, u64);
387// impl_native_type!(arrow::array::types::UInt128Type, u128);
388impl_native_type!(arrow::array::types::Int8Type, i8);
389impl_native_type!(arrow::array::types::Int16Type, i16);
390impl_native_type!(arrow::array::types::Int32Type, i32);
391impl_native_type!(arrow::array::types::Int64Type, i64);
392// impl_native_type!(arrow::array::types::Int128Type, i128);
393impl_native_type!(arrow::array::types::Float16Type, half::f16);
394impl_native_type!(arrow::array::types::Float32Type, f32);
395impl_native_type!(arrow::array::types::Float64Type, f64);
396
397/// The actual implementation of `impl_array_native_type!`, so that we don't have to work in a macro.
398#[expect(clippy::needless_pass_by_value)] // The simplest way to avoid lifetime issues.
399fn slice_as_array_native<'a, const N: usize, P, T>(
400    component_descriptor: ComponentDescriptor,
401    array: &'a dyn ArrowArray,
402    component_spans: impl Iterator<Item = Span<usize>> + 'a,
403) -> impl Iterator<Item = &'a [[T; N]]> + 'a
404where
405    [T; N]: bytemuck::Pod,
406    P: ArrowPrimitiveType<Native = T>,
407    T: ArrowNativeType + bytemuck::Pod,
408{
409    let Some(fixed_size_list_array) = array.downcast_array_ref::<ArrowFixedSizeListArray>() else {
410        if cfg!(debug_assertions) {
411            panic!("downcast failed for {component_descriptor}, data discarded");
412        } else {
413            re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
414        }
415        return Either::Left(std::iter::empty());
416    };
417
418    let Some(values) = fixed_size_list_array
419        .values()
420        .downcast_array_ref::<ArrowPrimitiveArray<P>>()
421    else {
422        if cfg!(debug_assertions) {
423            panic!("downcast failed for {component_descriptor}, data discarded");
424        } else {
425            re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
426        }
427        return Either::Left(std::iter::empty());
428    };
429
430    let size = fixed_size_list_array.value_length() as usize;
431    let values = values.values().as_ref();
432
433    // NOTE: No need for validity checks here, `component_spans` already takes care of that.
434    Either::Right(
435        component_spans.map(move |span| bytemuck::cast_slice(&values[(span * size).range()])),
436    )
437}
438
439// We use a macro instead of a blanket impl because this violates orphan rules.
440macro_rules! impl_array_native_type {
441    ($arrow_primitive_type:ty, $native_type:ty) => {
442        impl<const N: usize> ChunkComponentSlicer for [$native_type; N]
443        where
444            [$native_type; N]: bytemuck::Pod,
445        {
446            type Item<'a> = &'a [[$native_type; N]];
447
448            fn slice<'a>(
449                component_descriptor: ComponentDescriptor,
450                array: &'a dyn ArrowArray,
451                component_spans: impl Iterator<Item = Span<usize>> + 'a,
452            ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
453                slice_as_array_native::<N, $arrow_primitive_type, $native_type>(
454                    component_descriptor,
455                    array,
456                    component_spans,
457                )
458            }
459        }
460    };
461}
462
463impl_array_native_type!(arrow::array::types::UInt8Type, u8);
464impl_array_native_type!(arrow::array::types::UInt16Type, u16);
465impl_array_native_type!(arrow::array::types::UInt32Type, u32);
466impl_array_native_type!(arrow::array::types::UInt64Type, u64);
467// impl_array_native_type!(arrow::array::types::UInt128Type, u128);
468impl_array_native_type!(arrow::array::types::Int8Type, i8);
469impl_array_native_type!(arrow::array::types::Int16Type, i16);
470impl_array_native_type!(arrow::array::types::Int32Type, i32);
471impl_array_native_type!(arrow::array::types::Int64Type, i64);
472// impl_array_native_type!(arrow::array::types::Int128Type, i128);
473impl_array_native_type!(arrow::array::types::Float16Type, half::f16);
474impl_array_native_type!(arrow::array::types::Float32Type, f32);
475impl_array_native_type!(arrow::array::types::Float64Type, f64);
476
477/// The actual implementation of `impl_buffer_native_type!`, so that we don't have to work in a macro.
478#[expect(clippy::needless_pass_by_value)] // The simplest way to avoid lifetime issues.
479fn slice_as_buffer_native<'a, P, T>(
480    component_descriptor: ComponentDescriptor,
481    array: &'a dyn ArrowArray,
482    component_spans: impl Iterator<Item = Span<usize>> + 'a,
483) -> impl Iterator<Item = Vec<ArrowScalarBuffer<T>>> + 'a
484where
485    P: ArrowPrimitiveType<Native = T>,
486    T: ArrowNativeType,
487{
488    let Some(inner_list_array) = array.downcast_array_ref::<ArrowListArray>() else {
489        if cfg!(debug_assertions) {
490            panic!(
491                "DEBUG BUILD: {component_descriptor} had unexpected datatype: {:?}",
492                array.data_type()
493            );
494        } else {
495            re_log::error_once!(
496                "{component_descriptor} had unexpected datatype: {:?}. Data discarded",
497                array.data_type()
498            );
499            return Either::Left(std::iter::empty());
500        }
501    };
502
503    let Some(values) = inner_list_array
504        .values()
505        .downcast_array_ref::<ArrowPrimitiveArray<P>>()
506    else {
507        if cfg!(debug_assertions) {
508            panic!(
509                "DEBUG BUILD: {component_descriptor} had unexpected datatype: {:?}",
510                array.data_type()
511            );
512        } else {
513            re_log::error_once!(
514                "{component_descriptor} had unexpected datatype: {:?}. Data discarded",
515                array.data_type()
516            );
517            return Either::Left(std::iter::empty());
518        }
519    };
520
521    let values = values.values();
522    let offsets = inner_list_array.offsets();
523    let lengths = offsets.lengths().collect_vec();
524
525    // NOTE: No need for validity checks here, `component_spans` already takes care of that.
526    Either::Right(component_spans.map(move |span| {
527        let offsets = &offsets[span.range()];
528        let lengths = &lengths[span.range()];
529        izip!(offsets, lengths)
530            // NOTE: Not an actual clone, just a refbump of the underlying buffer.
531            .map(|(&idx, &len)| values.clone().slice(idx as _, len))
532            .collect_vec()
533    }))
534}
535
536// We special case `&[u8]` so that it works both for `List[u8]` and `Binary/LargeBinary` arrays.
537fn slice_as_u8<'a>(
538    component_descriptor: ComponentDescriptor,
539    array: &'a dyn ArrowArray,
540    component_spans: impl Iterator<Item = Span<usize>> + 'a,
541) -> impl Iterator<Item = Vec<Buffer>> + 'a {
542    if let Some(binary_array) = array.downcast_array_ref::<BinaryArray>() {
543        let values = binary_array.values();
544        let offsets = binary_array.offsets();
545        let lengths = offsets.lengths().collect_vec();
546
547        // NOTE: No need for validity checks here, `component_spans` already takes care of that.
548        Either::Left(Either::Left(component_spans.map(move |span| {
549            let offsets = &offsets[span.range()];
550            let lengths = &lengths[span.range()];
551            izip!(offsets, lengths)
552                // NOTE: Not an actual clone, just a refbump of the underlying buffer.
553                .map(|(&idx, &len)| values.clone().slice_with_length(idx as _, len))
554                .collect_vec()
555        })))
556    } else if let Some(binary_array) = array.downcast_array_ref::<LargeBinaryArray>() {
557        let values = binary_array.values();
558        let offsets = binary_array.offsets();
559        let lengths = offsets.lengths().collect_vec();
560
561        // NOTE: No need for validity checks here, `component_spans` already takes care of that.
562        Either::Left(Either::Right(component_spans.map(move |span| {
563            let offsets = &offsets[span.range()];
564            let lengths = &lengths[span.range()];
565            izip!(offsets, lengths)
566                // NOTE: Not an actual clone, just a refbump of the underlying buffer.
567                .map(|(&idx, &len)| values.clone().slice_with_length(idx as _, len))
568                .collect_vec()
569        })))
570    } else {
571        Either::Right(
572            slice_as_buffer_native::<arrow::array::types::UInt8Type, u8>(
573                component_descriptor,
574                array,
575                component_spans,
576            )
577            .map(|scalar_buffers| {
578                scalar_buffers
579                    .into_iter()
580                    .map(|scalar_buffer| scalar_buffer.into_inner())
581                    .collect_vec()
582            }),
583        )
584    }
585}
586
587// We use a macro instead of a blanket impl because this violates orphan rules.
588macro_rules! impl_buffer_native_type {
589    ($primitive_type:ty, $native_type:ty) => {
590        impl ChunkComponentSlicer for &[$native_type] {
591            type Item<'a> = Vec<ArrowScalarBuffer<$native_type>>;
592
593            fn slice<'a>(
594                component_descriptor: ComponentDescriptor,
595                array: &'a dyn ArrowArray,
596                component_spans: impl Iterator<Item = Span<usize>> + 'a,
597            ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
598                slice_as_buffer_native::<$primitive_type, $native_type>(
599                    component_descriptor,
600                    array,
601                    component_spans,
602                )
603            }
604        }
605    };
606}
607
608// We special case `&[u8]` so that it works both for `List[u8]` and `Binary` arrays.
609impl ChunkComponentSlicer for &[u8] {
610    type Item<'a> = Vec<Buffer>;
611
612    fn slice<'a>(
613        component_descriptor: ComponentDescriptor,
614        array: &'a dyn ArrowArray,
615        component_spans: impl Iterator<Item = Span<usize>> + 'a,
616    ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
617        slice_as_u8(component_descriptor, array, component_spans)
618    }
619}
620
621impl_buffer_native_type!(arrow::array::types::UInt16Type, u16);
622impl_buffer_native_type!(arrow::array::types::UInt32Type, u32);
623impl_buffer_native_type!(arrow::array::types::UInt64Type, u64);
624// impl_buffer_native_type!(arrow::array::types::UInt128Type, u128);
625impl_buffer_native_type!(arrow::array::types::Int8Type, i8);
626impl_buffer_native_type!(arrow::array::types::Int16Type, i16);
627impl_buffer_native_type!(arrow::array::types::Int32Type, i32);
628impl_buffer_native_type!(arrow::array::types::Int64Type, i64);
629// impl_buffer_native_type!(arrow::array::types::Int128Type, i128);
630impl_buffer_native_type!(arrow::array::types::Float16Type, half::f16);
631impl_buffer_native_type!(arrow::array::types::Float32Type, f32);
632impl_buffer_native_type!(arrow::array::types::Float64Type, f64);
633
634/// The actual implementation of `impl_array_list_native_type!`, so that we don't have to work in a macro.
635#[expect(clippy::needless_pass_by_value)] // The simplest way to avoid lifetime issues.
636fn slice_as_array_list_native<'a, const N: usize, P, T>(
637    component_descriptor: ComponentDescriptor,
638    array: &'a dyn ArrowArray,
639    component_spans: impl Iterator<Item = Span<usize>> + 'a,
640) -> impl Iterator<Item = Vec<&'a [[T; N]]>> + 'a
641where
642    [T; N]: bytemuck::Pod,
643    P: ArrowPrimitiveType<Native = T>,
644    T: ArrowNativeType + bytemuck::Pod,
645{
646    let Some(inner_list_array) = array.downcast_array_ref::<ArrowListArray>() else {
647        if cfg!(debug_assertions) {
648            panic!("downcast failed for {component_descriptor}, data discarded");
649        } else {
650            re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
651        }
652        return Either::Left(std::iter::empty());
653    };
654
655    let inner_offsets = inner_list_array.offsets();
656    let inner_lengths = inner_offsets.lengths().collect_vec();
657
658    let Some(fixed_size_list_array) = inner_list_array
659        .values()
660        .downcast_array_ref::<ArrowFixedSizeListArray>()
661    else {
662        if cfg!(debug_assertions) {
663            panic!("downcast failed for {component_descriptor}, data discarded");
664        } else {
665            re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
666        }
667        return Either::Left(std::iter::empty());
668    };
669
670    let Some(values) = fixed_size_list_array
671        .values()
672        .downcast_array_ref::<ArrowPrimitiveArray<P>>()
673    else {
674        if cfg!(debug_assertions) {
675            panic!("downcast failed for {component_descriptor}, data discarded");
676        } else {
677            re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
678        }
679        return Either::Left(std::iter::empty());
680    };
681
682    let size = fixed_size_list_array.value_length() as usize;
683    let values = values.values();
684
685    // NOTE: No need for validity checks here, `iter_offsets` already takes care of that.
686    Either::Right(component_spans.map(move |span| {
687        let inner_offsets = &inner_offsets[span.range()];
688        let inner_lengths = &inner_lengths[span.range()];
689        izip!(inner_offsets, inner_lengths)
690            .map(|(&idx, &len)| {
691                let idx = idx as usize;
692                bytemuck::cast_slice(&values[idx * size..idx * size + len * size])
693            })
694            .collect_vec()
695    }))
696}
697
698// We use a macro instead of a blanket impl because this violates orphan rules.
699macro_rules! impl_array_list_native_type {
700    ($primitive_type:ty, $native_type:ty) => {
701        impl<const N: usize> ChunkComponentSlicer for &[[$native_type; N]]
702        where
703            [$native_type; N]: bytemuck::Pod,
704        {
705            type Item<'a> = Vec<&'a [[$native_type; N]]>;
706
707            fn slice<'a>(
708                component_descriptor: ComponentDescriptor,
709                array: &'a dyn ArrowArray,
710                component_spans: impl Iterator<Item = Span<usize>> + 'a,
711            ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
712                slice_as_array_list_native::<N, $primitive_type, $native_type>(
713                    component_descriptor,
714                    array,
715                    component_spans,
716                )
717            }
718        }
719    };
720}
721
722impl_array_list_native_type!(arrow::array::types::UInt8Type, u8);
723impl_array_list_native_type!(arrow::array::types::UInt16Type, u16);
724impl_array_list_native_type!(arrow::array::types::UInt32Type, u32);
725impl_array_list_native_type!(arrow::array::types::UInt64Type, u64);
726// impl_array_list_native_type!(arrow::array::types::UInt128Type, u128);
727impl_array_list_native_type!(arrow::array::types::Int8Type, i8);
728impl_array_list_native_type!(arrow::array::types::Int16Type, i16);
729impl_array_list_native_type!(arrow::array::types::Int32Type, i32);
730impl_array_list_native_type!(arrow::array::types::Int64Type, i64);
731// impl_array_list_native_type!(arrow::array::types::Int128Type, i128);
732impl_array_list_native_type!(arrow::array::types::Float16Type, half::f16);
733impl_array_list_native_type!(arrow::array::types::Float32Type, f32);
734impl_array_list_native_type!(arrow::array::types::Float64Type, f64);
735
736impl ChunkComponentSlicer for String {
737    type Item<'a> = Vec<ArrowString>;
738
739    fn slice<'a>(
740        component_descriptor: ComponentDescriptor,
741        array: &'a dyn ArrowArray,
742        component_spans: impl Iterator<Item = Span<usize>> + 'a,
743    ) -> impl Iterator<Item = Vec<ArrowString>> + 'a {
744        let Some(utf8_array) = array.downcast_array_ref::<ArrowStringArray>() else {
745            if cfg!(debug_assertions) {
746                panic!("downcast failed for {component_descriptor}, data discarded");
747            } else {
748                re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
749            }
750            return Either::Left(std::iter::empty());
751        };
752
753        let values = utf8_array.values().clone();
754        let offsets = utf8_array.offsets().clone();
755        let lengths = offsets.lengths().collect_vec();
756
757        // NOTE: No need for validity checks here, `component_spans` already takes care of that.
758        Either::Right(component_spans.map(move |range| {
759            let offsets = &offsets[range.range()];
760            let lengths = &lengths[range.range()];
761            izip!(offsets, lengths)
762                .map(|(&idx, &len)| ArrowString::from(values.slice_with_length(idx as _, len)))
763                .collect_vec()
764        }))
765    }
766}
767
768impl ChunkComponentSlicer for bool {
769    type Item<'a> = ArrowBooleanBuffer;
770
771    fn slice<'a>(
772        component_descriptor: ComponentDescriptor,
773        array: &'a dyn ArrowArray,
774        component_spans: impl Iterator<Item = Span<usize>> + 'a,
775    ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
776        let Some(values) = array.downcast_array_ref::<ArrowBooleanArray>() else {
777            if cfg!(debug_assertions) {
778                panic!("downcast failed for {component_descriptor}, data discarded");
779            } else {
780                re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
781            }
782            return Either::Left(std::iter::empty());
783        };
784        let values = values.values().clone();
785
786        // NOTE: No need for validity checks here, `component_spans` already takes care of that.
787        Either::Right(
788            component_spans.map(move |Span { start, len }| values.clone().slice(start, len)),
789        )
790    }
791}
792
793// ---
794
795pub struct ChunkIndicesIter {
796    chunk: Arc<Chunk>,
797
798    time_column: Option<TimeColumn>,
799    index: usize,
800}
801
802impl Iterator for ChunkIndicesIter {
803    type Item = (TimeInt, RowId);
804
805    fn next(&mut self) -> Option<Self::Item> {
806        let i = self.index;
807        self.index += 1;
808
809        let row_id = *self.chunk.row_ids_slice().get(i)?;
810
811        if let Some(time_column) = &self.time_column {
812            let time = *time_column.times_raw().get(i)?;
813            let time = TimeInt::new_temporal(time);
814            Some((time, row_id))
815        } else {
816            Some((TimeInt::STATIC, row_id))
817        }
818    }
819}
820
821impl Chunk {
822    /// Returns an iterator over the indices (`(TimeInt, RowId)`) of a [`Chunk`], for a given timeline.
823    ///
824    /// If the chunk is static, `timeline` will be ignored.
825    ///
826    /// The returned iterator outlives `self`, thus it can be passed around freely.
827    /// The tradeoff is that `self` must be an `Arc`.
828    ///
829    /// See also [`Self::iter_indices`].
830    #[inline]
831    pub fn iter_indices_owned(
832        self: Arc<Self>,
833        timeline: &TimelineName,
834    ) -> impl Iterator<Item = (TimeInt, RowId)> + use<> {
835        if self.is_static() {
836            Either::Left(ChunkIndicesIter {
837                chunk: self,
838                time_column: None,
839                index: 0,
840            })
841        } else {
842            self.timelines.get(timeline).cloned().map_or_else(
843                || Either::Right(Either::Left(std::iter::empty())),
844                |time_column| {
845                    Either::Right(Either::Right(ChunkIndicesIter {
846                        chunk: self,
847                        time_column: Some(time_column),
848                        index: 0,
849                    }))
850                },
851            )
852        }
853    }
854}
855
856// ---
857
858/// The actual iterator implementation for [`Chunk::iter_component`].
859pub struct ChunkComponentIter<C, IO> {
860    values: Arc<Vec<C>>,
861    offsets: IO,
862}
863
864/// The underlying item type for [`ChunkComponentIter`].
865///
866/// This allows us to cheaply carry slices of deserialized data, while working around the
867/// limitations of Rust's Iterator trait and ecosystem.
868///
869/// See [`ChunkComponentIterItem::as_slice`].
870#[derive(Clone, PartialEq)]
871pub struct ChunkComponentIterItem<C> {
872    values: Arc<Vec<C>>,
873    span: Span<usize>,
874}
875
876impl<C: PartialEq> PartialEq<[C]> for ChunkComponentIterItem<C> {
877    fn eq(&self, rhs: &[C]) -> bool {
878        self.as_slice().eq(rhs)
879    }
880}
881
882impl<C: PartialEq> PartialEq<Vec<C>> for ChunkComponentIterItem<C> {
883    fn eq(&self, rhs: &Vec<C>) -> bool {
884        self.as_slice().eq(rhs)
885    }
886}
887
888impl<C: Eq> Eq for ChunkComponentIterItem<C> {}
889
890// NOTE: No `C: Default`!
891impl<C> Default for ChunkComponentIterItem<C> {
892    #[inline]
893    fn default() -> Self {
894        Self {
895            values: Arc::new(Vec::new()),
896            span: Span::default(),
897        }
898    }
899}
900
901impl<C> ChunkComponentIterItem<C> {
902    #[inline]
903    pub fn as_slice(&self) -> &[C] {
904        &self.values[self.span.range()]
905    }
906}
907
908impl<C> std::ops::Deref for ChunkComponentIterItem<C> {
909    type Target = [C];
910
911    #[inline]
912    fn deref(&self) -> &Self::Target {
913        self.as_slice()
914    }
915}
916
917impl<C: Component, IO: Iterator<Item = Span<usize>>> Iterator for ChunkComponentIter<C, IO> {
918    type Item = ChunkComponentIterItem<C>;
919
920    #[inline]
921    fn next(&mut self) -> Option<Self::Item> {
922        self.offsets.next().map(move |span| ChunkComponentIterItem {
923            values: Arc::clone(&self.values),
924            span,
925        })
926    }
927}
928
929impl Chunk {
930    /// Returns an iterator over the deserialized batches of a [`Chunk`], for a given component.
931    ///
932    /// This is a dedicated fast path: the entire column will be downcasted and deserialized at
933    /// once, and then every component batch will be a slice reference into that global slice.
934    /// Use this when working with complex arrow datatypes and performance matters (e.g. ranging
935    /// through enum types across many timestamps).
936    ///
937    /// TODO(#5305): Note that, while this is much faster than deserializing each row individually,
938    /// this still uses the old codegen'd deserialization path, which does some very unidiomatic Arrow
939    /// things, and is therefore very slow at the moment. Avoid this on performance critical paths.
940    ///
941    /// See also:
942    /// * [`Self::iter_slices`]
943    /// * [`Self::iter_slices_from_struct_field`]
944    #[inline]
945    pub fn iter_component<C: Component>(
946        &self,
947        component_descriptor: &ComponentDescriptor,
948    ) -> ChunkComponentIter<C, impl Iterator<Item = Span<usize>> + '_ + use<'_, C>> {
949        debug_assert_eq!(
950            component_descriptor.component_type,
951            Some(C::name()),
952            "component type mismatch"
953        );
954
955        let Some(list_array) = self.components.get(component_descriptor) else {
956            return ChunkComponentIter {
957                values: Arc::new(vec![]),
958                offsets: Either::Left(std::iter::empty()),
959            };
960        };
961
962        let values = arrow::array::ArrayRef::from(list_array.values().clone());
963        let values = match C::from_arrow(&values) {
964            Ok(values) => values,
965            Err(err) => {
966                if cfg!(debug_assertions) {
967                    panic!(
968                        "[DEBUG-ONLY] deserialization failed for {}, data discarded: {}",
969                        C::name(),
970                        re_error::format_ref(&err),
971                    );
972                } else {
973                    re_log::error_once!(
974                        "deserialization failed for {}, data discarded: {}",
975                        C::name(),
976                        re_error::format_ref(&err),
977                    );
978                }
979                return ChunkComponentIter {
980                    values: Arc::new(vec![]),
981                    offsets: Either::Left(std::iter::empty()),
982                };
983            }
984        };
985
986        // NOTE: No need for validity checks here, `iter_offsets` already takes care of that.
987        ChunkComponentIter {
988            values: Arc::new(values),
989            offsets: Either::Right(self.iter_component_offsets(component_descriptor)),
990        }
991    }
992}
993
994// ---
995
996#[cfg(test)]
997mod tests {
998    use std::sync::Arc;
999
1000    use itertools::{Itertools as _, izip};
1001    use re_log_types::{
1002        EntityPath, TimeInt, TimePoint,
1003        example_components::{MyPoint, MyPoints},
1004    };
1005
1006    use crate::{Chunk, RowId, Timeline};
1007
1008    #[test]
1009    fn iter_indices_temporal() -> anyhow::Result<()> {
1010        let entity_path = EntityPath::from("this/that");
1011
1012        let row_id1 = RowId::new();
1013        let row_id2 = RowId::new();
1014        let row_id3 = RowId::new();
1015        let row_id4 = RowId::new();
1016        let row_id5 = RowId::new();
1017
1018        let timeline_frame = Timeline::new_sequence("frame");
1019
1020        let timepoint1 = [(timeline_frame, 1)];
1021        let timepoint2 = [(timeline_frame, 3)];
1022        let timepoint3 = [(timeline_frame, 5)];
1023        let timepoint4 = [(timeline_frame, 7)];
1024        let timepoint5 = [(timeline_frame, 9)];
1025
1026        let points1 = &[MyPoint::new(1.0, 1.0)];
1027        let points2 = &[MyPoint::new(2.0, 2.0)];
1028        let points3 = &[MyPoint::new(3.0, 3.0)];
1029        let points4 = &[MyPoint::new(4.0, 4.0)];
1030        let points5 = &[MyPoint::new(5.0, 5.0)];
1031
1032        let chunk = Arc::new(
1033            Chunk::builder(entity_path.clone())
1034                .with_component_batches(
1035                    row_id1,
1036                    timepoint1,
1037                    [(MyPoints::descriptor_points(), points1 as _)],
1038                )
1039                .with_component_batches(
1040                    row_id2,
1041                    timepoint2,
1042                    [(MyPoints::descriptor_points(), points2 as _)],
1043                )
1044                .with_component_batches(
1045                    row_id3,
1046                    timepoint3,
1047                    [(MyPoints::descriptor_points(), points3 as _)],
1048                )
1049                .with_component_batches(
1050                    row_id4,
1051                    timepoint4,
1052                    [(MyPoints::descriptor_points(), points4 as _)],
1053                )
1054                .with_component_batches(
1055                    row_id5,
1056                    timepoint5,
1057                    [(MyPoints::descriptor_points(), points5 as _)],
1058                )
1059                .build()?,
1060        );
1061
1062        {
1063            let got = Arc::clone(&chunk)
1064                .iter_indices_owned(timeline_frame.name())
1065                .collect_vec();
1066            let expected = izip!(
1067                chunk
1068                    .timelines
1069                    .get(timeline_frame.name())
1070                    .map(|time_column| time_column.times().collect_vec())
1071                    .unwrap_or_default(),
1072                chunk.row_ids()
1073            )
1074            .collect_vec();
1075
1076            similar_asserts::assert_eq!(expected, got);
1077        }
1078
1079        Ok(())
1080    }
1081
1082    #[test]
1083    fn iter_indices_static() -> anyhow::Result<()> {
1084        let entity_path = EntityPath::from("this/that");
1085
1086        let row_id1 = RowId::new();
1087        let row_id2 = RowId::new();
1088        let row_id3 = RowId::new();
1089        let row_id4 = RowId::new();
1090        let row_id5 = RowId::new();
1091
1092        let timeline_frame = Timeline::new_sequence("frame");
1093
1094        let points1 = &[MyPoint::new(1.0, 1.0)];
1095        let points2 = &[MyPoint::new(2.0, 2.0)];
1096        let points3 = &[MyPoint::new(3.0, 3.0)];
1097        let points4 = &[MyPoint::new(4.0, 4.0)];
1098        let points5 = &[MyPoint::new(5.0, 5.0)];
1099
1100        let chunk = Arc::new(
1101            Chunk::builder(entity_path.clone())
1102                .with_component_batches(
1103                    row_id1,
1104                    TimePoint::default(),
1105                    [(MyPoints::descriptor_points(), points1 as _)],
1106                )
1107                .with_component_batches(
1108                    row_id2,
1109                    TimePoint::default(),
1110                    [(MyPoints::descriptor_points(), points2 as _)],
1111                )
1112                .with_component_batches(
1113                    row_id3,
1114                    TimePoint::default(),
1115                    [(MyPoints::descriptor_points(), points3 as _)],
1116                )
1117                .with_component_batches(
1118                    row_id4,
1119                    TimePoint::default(),
1120                    [(MyPoints::descriptor_points(), points4 as _)],
1121                )
1122                .with_component_batches(
1123                    row_id5,
1124                    TimePoint::default(),
1125                    [(MyPoints::descriptor_points(), points5 as _)],
1126                )
1127                .build()?,
1128        );
1129
1130        {
1131            let got = Arc::clone(&chunk)
1132                .iter_indices_owned(timeline_frame.name())
1133                .collect_vec();
1134            let expected = izip!(std::iter::repeat(TimeInt::STATIC), chunk.row_ids()).collect_vec();
1135
1136            similar_asserts::assert_eq!(expected, got);
1137        }
1138
1139        Ok(())
1140    }
1141}