1use std::sync::Arc;
2
3use arrow::array::{
4 Array as ArrowArray, ArrayRef as ArrowArrayRef, ArrowPrimitiveType, BinaryArray,
5 BooleanArray as ArrowBooleanArray, FixedSizeListArray as ArrowFixedSizeListArray,
6 LargeBinaryArray, ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray,
7 StringArray as ArrowStringArray, StructArray as ArrowStructArray,
8};
9use arrow::buffer::{
10 BooleanBuffer as ArrowBooleanBuffer, Buffer, ScalarBuffer as ArrowScalarBuffer,
11};
12use arrow::datatypes::ArrowNativeType;
13use itertools::{Either, Itertools as _, izip};
14use re_arrow_util::ArrowArrayDowncastRef as _;
15use re_log_types::{TimeInt, TimePoint, TimelineName};
16use re_span::Span;
17use re_types_core::{ArrowString, Component, ComponentIdentifier};
18
19use crate::{Chunk, RowId, TimeColumn};
20
21fn error_on_downcast_failure(
24 component: ComponentIdentifier,
25 target: &str,
26 actual: &arrow::datatypes::DataType,
27) {
28 re_log::debug_panic!(
29 "downcast to {target} failed for {component}. Array data type was {actual:?}. Data discarded"
30 );
31 re_log::error_once!(
32 "downcast to {target} failed for {component}. Array data type was {actual:?}. Data discarded"
33 );
34}
35
36impl Chunk {
46 pub fn raw_component_array(&self, component: ComponentIdentifier) -> Option<&ArrowArrayRef> {
50 self.components
51 .get_array(component)
52 .map(|list_array| list_array.values())
53 }
54
55 #[inline]
63 pub fn iter_indices(
64 &self,
65 timeline: &TimelineName,
66 ) -> impl Iterator<Item = (TimeInt, RowId)> + '_ + use<'_> {
67 if self.is_static() {
68 Either::Right(Either::Left(izip!(
69 std::iter::repeat(TimeInt::STATIC),
70 self.row_ids()
71 )))
72 } else {
73 let Some(time_column) = self.timelines.get(timeline) else {
74 return Either::Left(std::iter::empty());
75 };
76
77 Either::Right(Either::Right(izip!(time_column.times(), self.row_ids())))
78 }
79 }
80
81 pub fn iter_component_indices(
91 &self,
92 timeline: TimelineName,
93 component: ComponentIdentifier,
94 ) -> impl Iterator<Item = (TimeInt, RowId)> + '_ + use<'_> {
95 let Some(list_array) = self.components.get_array(component) else {
96 return Either::Left(std::iter::empty());
97 };
98
99 if self.is_static() {
100 let indices = izip!(std::iter::repeat(TimeInt::STATIC), self.row_ids());
101
102 if let Some(validity) = list_array.nulls() {
103 Either::Right(Either::Left(Either::Left(
104 indices
105 .enumerate()
106 .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
107 )))
108 } else {
109 Either::Right(Either::Left(Either::Right(indices)))
110 }
111 } else {
112 let Some(time_column) = self.timelines.get(&timeline) else {
113 return Either::Left(std::iter::empty());
114 };
115
116 let indices = izip!(time_column.times(), self.row_ids());
117
118 if let Some(validity) = list_array.nulls() {
119 Either::Right(Either::Right(Either::Left(
120 indices
121 .enumerate()
122 .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
123 )))
124 } else {
125 Either::Right(Either::Right(Either::Right(indices)))
126 }
127 }
128 }
129
130 #[inline]
135 pub fn iter_timepoints(&self) -> impl Iterator<Item = TimePoint> + '_ {
136 let mut timelines = self
137 .timelines
138 .values()
139 .map(|time_column| (time_column.timeline, time_column.times()))
140 .collect_vec();
141
142 std::iter::from_fn(move || {
143 let mut timepoint = TimePoint::default();
144 for (timeline, times) in &mut timelines {
145 timepoint.insert(*timeline, times.next()?);
146 }
147 Some(timepoint)
148 })
149 }
150
151 pub fn iter_component_timepoints(
158 &self,
159 component: ComponentIdentifier,
160 ) -> impl Iterator<Item = TimePoint> + '_ + use<'_> {
161 let Some(list_array) = self.components.get_array(component) else {
162 return Either::Left(std::iter::empty());
163 };
164
165 if let Some(validity) = list_array.nulls() {
166 let mut timelines = self
167 .timelines
168 .values()
169 .map(|time_column| {
170 (
171 time_column.timeline,
172 time_column
173 .times()
174 .enumerate()
175 .filter(|(i, _)| validity.is_valid(*i))
176 .map(|(_, time)| time),
177 )
178 })
179 .collect_vec();
180
181 Either::Right(Either::Left(std::iter::from_fn(move || {
182 let mut timepoint = TimePoint::default();
183 for (timeline, times) in &mut timelines {
184 timepoint.insert(*timeline, times.next()?);
185 }
186 Some(timepoint)
187 })))
188 } else {
189 let mut timelines = self
190 .timelines
191 .values()
192 .map(|time_column| (time_column.timeline, time_column.times()))
193 .collect_vec();
194
195 Either::Right(Either::Right(std::iter::from_fn(move || {
196 let mut timepoint = TimePoint::default();
197 for (timeline, times) in &mut timelines {
198 timepoint.insert(*timeline, times.next()?);
199 }
200 Some(timepoint)
201 })))
202 }
203 }
204
205 pub fn iter_component_offsets(
211 &self,
212 component: ComponentIdentifier,
213 ) -> impl Iterator<Item = Span<usize>> {
214 let Some(list_array) = self.components.get_array(component) else {
215 return Either::Left(std::iter::empty());
216 };
217
218 let offsets = list_array.offsets().iter().map(|idx| *idx as usize);
219 let lengths = list_array.offsets().lengths();
220
221 if let Some(validity) = list_array.nulls() {
222 Either::Right(Either::Left(
223 izip!(offsets, lengths)
224 .enumerate()
225 .filter_map(|(i, o)| validity.is_valid(i).then_some(o))
226 .map(|(start, len)| Span { start, len }),
227 ))
228 } else {
229 Either::Right(Either::Right(
230 izip!(offsets, lengths).map(|(start, len)| Span { start, len }),
231 ))
232 }
233 }
234
235 #[inline]
246 pub fn iter_slices<'a, S: 'a + ChunkComponentSlicer>(
247 &'a self,
248 component: ComponentIdentifier,
249 ) -> impl Iterator<Item = S::Item<'a>> + 'a + use<'a, S> {
250 let Some(list_array) = self.components.get_array(component) else {
251 return Either::Left(std::iter::empty());
252 };
253
254 let component_offset_values = self.iter_component_offsets(component);
255
256 Either::Right(S::slice(
257 component,
258 &**list_array.values() as _,
259 component_offset_values,
260 ))
261 }
262
263 pub fn iter_slices_from_struct_field<'a, S: 'a + ChunkComponentSlicer>(
276 &'a self,
277 component: ComponentIdentifier,
278 field_name: &'a str,
279 ) -> impl Iterator<Item = S::Item<'a>> + 'a {
280 let Some(list_array) = self.components.get_array(component) else {
281 return Either::Left(std::iter::empty());
282 };
283
284 let Some(struct_array) = list_array.values().downcast_array_ref::<ArrowStructArray>()
285 else {
286 error_on_downcast_failure(component, "ArrowStructArray", list_array.data_type());
287 return Either::Left(std::iter::empty());
288 };
289
290 let Some(field_idx) = struct_array
291 .fields()
292 .iter()
293 .enumerate()
294 .find_map(|(i, field)| (field.name() == field_name).then_some(i))
295 else {
296 re_log::debug_panic!("field {field_name} not found for {component}, data discarded");
297 re_log::error_once!("field {field_name} not found for {component}, data discarded");
298 return Either::Left(std::iter::empty());
299 };
300
301 if field_idx >= struct_array.num_columns() {
302 re_log::debug_panic!("field {field_name} not found for {component}, data discarded");
303 re_log::error_once!("field {field_name} not found for {component}, data discarded");
304 return Either::Left(std::iter::empty());
305 }
306
307 let component_offset_values = self.iter_component_offsets(component);
308
309 Either::Right(S::slice(
310 component,
311 struct_array.column(field_idx),
312 component_offset_values,
313 ))
314 }
315}
316
317pub trait ChunkComponentSlicer {
323 type Item<'a>;
324
325 fn slice<'a>(
326 component: ComponentIdentifier,
327 array: &'a dyn ArrowArray,
328 component_spans: impl Iterator<Item = Span<usize>> + 'a,
329 ) -> impl Iterator<Item = Self::Item<'a>>;
330}
331
332fn slice_as_native<'a, P, T>(
334 component: ComponentIdentifier,
335 array: &'a dyn ArrowArray,
336 component_spans: impl Iterator<Item = Span<usize>> + 'a,
337) -> impl Iterator<Item = &'a [T]> + 'a
338where
339 P: ArrowPrimitiveType<Native = T>,
340 T: ArrowNativeType,
341{
342 let Some(values) = array.downcast_array_ref::<ArrowPrimitiveArray<P>>() else {
343 error_on_downcast_failure(component, "ArrowPrimitiveArray<T>", array.data_type());
344 return Either::Left(std::iter::empty());
345 };
346 let values = values.values().as_ref();
347
348 Either::Right(component_spans.map(move |range| &values[range.range()]))
350}
351
352macro_rules! impl_native_type {
354 ($arrow_primitive_type:ty, $native_type:ty) => {
355 impl ChunkComponentSlicer for $native_type {
356 type Item<'a> = &'a [$native_type];
357
358 fn slice<'a>(
359 component: ComponentIdentifier,
360 array: &'a dyn ArrowArray,
361 component_spans: impl Iterator<Item = Span<usize>> + 'a,
362 ) -> impl Iterator<Item = Self::Item<'a>> {
363 slice_as_native::<$arrow_primitive_type, $native_type>(
364 component,
365 array,
366 component_spans,
367 )
368 }
369 }
370 };
371}
372
373impl_native_type!(arrow::array::types::UInt8Type, u8);
374impl_native_type!(arrow::array::types::UInt16Type, u16);
375impl_native_type!(arrow::array::types::UInt32Type, u32);
376impl_native_type!(arrow::array::types::UInt64Type, u64);
377impl_native_type!(arrow::array::types::Int8Type, i8);
379impl_native_type!(arrow::array::types::Int16Type, i16);
380impl_native_type!(arrow::array::types::Int32Type, i32);
381impl_native_type!(arrow::array::types::Int64Type, i64);
382impl_native_type!(arrow::array::types::Float16Type, half::f16);
384impl_native_type!(arrow::array::types::Float32Type, f32);
385impl_native_type!(arrow::array::types::Float64Type, f64);
386
387fn slice_as_array_native<'a, const N: usize, P, T>(
389 component: ComponentIdentifier,
390 array: &'a dyn ArrowArray,
391 component_spans: impl Iterator<Item = Span<usize>> + 'a,
392) -> impl Iterator<Item = &'a [[T; N]]> + 'a
393where
394 [T; N]: bytemuck::Pod,
395 P: ArrowPrimitiveType<Native = T>,
396 T: ArrowNativeType + bytemuck::Pod,
397{
398 let Some(fixed_size_list_array) = array.downcast_array_ref::<ArrowFixedSizeListArray>() else {
399 error_on_downcast_failure(component, "ArrowFixedSizeListArray", array.data_type());
400 return Either::Left(std::iter::empty());
401 };
402
403 let Some(values) = fixed_size_list_array
404 .values()
405 .downcast_array_ref::<ArrowPrimitiveArray<P>>()
406 else {
407 error_on_downcast_failure(
408 component,
409 "ArrowPrimitiveArray<P>",
410 fixed_size_list_array.data_type(),
411 );
412 return Either::Left(std::iter::empty());
413 };
414
415 let size = fixed_size_list_array.value_length() as usize;
416 let values = values.values().as_ref();
417
418 Either::Right(
420 component_spans.map(move |span| bytemuck::cast_slice(&values[(span * size).range()])),
421 )
422}
423
424macro_rules! impl_array_native_type {
426 ($arrow_primitive_type:ty, $native_type:ty) => {
427 impl<const N: usize> ChunkComponentSlicer for [$native_type; N]
428 where
429 [$native_type; N]: bytemuck::Pod,
430 {
431 type Item<'a> = &'a [[$native_type; N]];
432
433 fn slice<'a>(
434 component: ComponentIdentifier,
435 array: &'a dyn ArrowArray,
436 component_spans: impl Iterator<Item = Span<usize>> + 'a,
437 ) -> impl Iterator<Item = Self::Item<'a>> {
438 slice_as_array_native::<N, $arrow_primitive_type, $native_type>(
439 component,
440 array,
441 component_spans,
442 )
443 }
444 }
445 };
446}
447
448impl_array_native_type!(arrow::array::types::UInt8Type, u8);
449impl_array_native_type!(arrow::array::types::UInt16Type, u16);
450impl_array_native_type!(arrow::array::types::UInt32Type, u32);
451impl_array_native_type!(arrow::array::types::UInt64Type, u64);
452impl_array_native_type!(arrow::array::types::Int8Type, i8);
454impl_array_native_type!(arrow::array::types::Int16Type, i16);
455impl_array_native_type!(arrow::array::types::Int32Type, i32);
456impl_array_native_type!(arrow::array::types::Int64Type, i64);
457impl_array_native_type!(arrow::array::types::Float16Type, half::f16);
459impl_array_native_type!(arrow::array::types::Float32Type, f32);
460impl_array_native_type!(arrow::array::types::Float64Type, f64);
461
462fn slice_as_buffer_native<'a, P, T>(
464 component: ComponentIdentifier,
465 array: &'a dyn ArrowArray,
466 component_spans: impl Iterator<Item = Span<usize>> + 'a,
467) -> impl Iterator<Item = Vec<ArrowScalarBuffer<T>>> + 'a
468where
469 P: ArrowPrimitiveType<Native = T>,
470 T: ArrowNativeType,
471{
472 let Some(inner_list_array) = array.downcast_array_ref::<ArrowListArray>() else {
473 error_on_downcast_failure(component, "ArrowListArray", array.data_type());
474 return Either::Left(std::iter::empty());
475 };
476
477 let Some(values) = inner_list_array
478 .values()
479 .downcast_array_ref::<ArrowPrimitiveArray<P>>()
480 else {
481 error_on_downcast_failure(
482 component,
483 "ArrowPrimitiveArray<P>",
484 inner_list_array.data_type(),
485 );
486 return Either::Left(std::iter::empty());
487 };
488
489 let values = values.values();
490 let offsets = inner_list_array.offsets();
491 let lengths = offsets.lengths().collect_vec();
492
493 Either::Right(component_spans.map(move |span| {
495 let offsets = &offsets[span.range()];
496 let lengths = &lengths[span.range()];
497 izip!(offsets, lengths)
498 .map(|(&idx, &len)| values.clone().slice(idx as _, len))
500 .collect_vec()
501 }))
502}
503
504fn slice_as_u8<'a>(
506 component: ComponentIdentifier,
507 array: &'a dyn ArrowArray,
508 component_spans: impl Iterator<Item = Span<usize>> + 'a,
509) -> impl Iterator<Item = Vec<Buffer>> + 'a {
510 if let Some(binary_array) = array.downcast_array_ref::<BinaryArray>() {
511 let values = binary_array.values();
512 let offsets = binary_array.offsets();
513 let lengths = offsets.lengths().collect_vec();
514
515 Either::Left(Either::Left(component_spans.map(move |span| {
517 let offsets = &offsets[span.range()];
518 let lengths = &lengths[span.range()];
519 izip!(offsets, lengths)
520 .map(|(&idx, &len)| values.clone().slice_with_length(idx as _, len))
522 .collect_vec()
523 })))
524 } else if let Some(binary_array) = array.downcast_array_ref::<LargeBinaryArray>() {
525 let values = binary_array.values();
526 let offsets = binary_array.offsets();
527 let lengths = offsets.lengths().collect_vec();
528
529 Either::Left(Either::Right(component_spans.map(move |span| {
531 let offsets = &offsets[span.range()];
532 let lengths = &lengths[span.range()];
533 izip!(offsets, lengths)
534 .map(|(&idx, &len)| values.clone().slice_with_length(idx as _, len))
536 .collect_vec()
537 })))
538 } else {
539 Either::Right(
540 slice_as_buffer_native::<arrow::array::types::UInt8Type, u8>(
541 component,
542 array,
543 component_spans,
544 )
545 .map(|scalar_buffers| {
546 scalar_buffers
547 .into_iter()
548 .map(|scalar_buffer| scalar_buffer.into_inner())
549 .collect_vec()
550 }),
551 )
552 }
553}
554
555macro_rules! impl_buffer_native_type {
557 ($primitive_type:ty, $native_type:ty) => {
558 impl ChunkComponentSlicer for &[$native_type] {
559 type Item<'a> = Vec<ArrowScalarBuffer<$native_type>>;
560
561 fn slice<'a>(
562 component: ComponentIdentifier,
563 array: &'a dyn ArrowArray,
564 component_spans: impl Iterator<Item = Span<usize>> + 'a,
565 ) -> impl Iterator<Item = Self::Item<'a>> {
566 slice_as_buffer_native::<$primitive_type, $native_type>(
567 component,
568 array,
569 component_spans,
570 )
571 }
572 }
573 };
574}
575
576impl ChunkComponentSlicer for &[u8] {
578 type Item<'a> = Vec<Buffer>;
579
580 fn slice<'a>(
581 component: ComponentIdentifier,
582 array: &'a dyn ArrowArray,
583 component_spans: impl Iterator<Item = Span<usize>> + 'a,
584 ) -> impl Iterator<Item = Self::Item<'a>> {
585 slice_as_u8(component, array, component_spans)
586 }
587}
588
589impl_buffer_native_type!(arrow::array::types::UInt16Type, u16);
590impl_buffer_native_type!(arrow::array::types::UInt32Type, u32);
591impl_buffer_native_type!(arrow::array::types::UInt64Type, u64);
592impl_buffer_native_type!(arrow::array::types::Int8Type, i8);
594impl_buffer_native_type!(arrow::array::types::Int16Type, i16);
595impl_buffer_native_type!(arrow::array::types::Int32Type, i32);
596impl_buffer_native_type!(arrow::array::types::Int64Type, i64);
597impl_buffer_native_type!(arrow::array::types::Float16Type, half::f16);
599impl_buffer_native_type!(arrow::array::types::Float32Type, f32);
600impl_buffer_native_type!(arrow::array::types::Float64Type, f64);
601
602fn slice_as_array_list_native<'a, const N: usize, P, T>(
604 component: ComponentIdentifier,
605 array: &'a dyn ArrowArray,
606 component_spans: impl Iterator<Item = Span<usize>> + 'a,
607) -> impl Iterator<Item = Vec<&'a [[T; N]]>> + 'a
608where
609 [T; N]: bytemuck::Pod,
610 P: ArrowPrimitiveType<Native = T>,
611 T: ArrowNativeType + bytemuck::Pod,
612{
613 let Some(inner_list_array) = array.downcast_array_ref::<ArrowListArray>() else {
614 error_on_downcast_failure(component, "ArrowListArray", array.data_type());
615 return Either::Left(std::iter::empty());
616 };
617
618 let inner_offsets = inner_list_array.offsets();
619 let inner_lengths = inner_offsets.lengths().collect_vec();
620
621 let Some(fixed_size_list_array) = inner_list_array
622 .values()
623 .downcast_array_ref::<ArrowFixedSizeListArray>()
624 else {
625 error_on_downcast_failure(
626 component,
627 "ArrowFixedSizeListArray",
628 inner_list_array.data_type(),
629 );
630 return Either::Left(std::iter::empty());
631 };
632
633 let Some(values) = fixed_size_list_array
634 .values()
635 .downcast_array_ref::<ArrowPrimitiveArray<P>>()
636 else {
637 error_on_downcast_failure(
638 component,
639 "ArrowPrimitiveArray<P>",
640 fixed_size_list_array.data_type(),
641 );
642 return Either::Left(std::iter::empty());
643 };
644
645 let size = fixed_size_list_array.value_length() as usize;
646 let values = values.values();
647
648 Either::Right(component_spans.map(move |span| {
650 let inner_offsets = &inner_offsets[span.range()];
651 let inner_lengths = &inner_lengths[span.range()];
652 izip!(inner_offsets, inner_lengths)
653 .map(|(&idx, &len)| {
654 let idx = idx as usize;
655 bytemuck::cast_slice(&values[idx * size..idx * size + len * size])
656 })
657 .collect_vec()
658 }))
659}
660
661macro_rules! impl_array_list_native_type {
663 ($primitive_type:ty, $native_type:ty) => {
664 impl<const N: usize> ChunkComponentSlicer for &[[$native_type; N]]
665 where
666 [$native_type; N]: bytemuck::Pod,
667 {
668 type Item<'a> = Vec<&'a [[$native_type; N]]>;
669
670 fn slice<'a>(
671 component: ComponentIdentifier,
672 array: &'a dyn ArrowArray,
673 component_spans: impl Iterator<Item = Span<usize>> + 'a,
674 ) -> impl Iterator<Item = Self::Item<'a>> {
675 slice_as_array_list_native::<N, $primitive_type, $native_type>(
676 component,
677 array,
678 component_spans,
679 )
680 }
681 }
682 };
683}
684
685impl_array_list_native_type!(arrow::array::types::UInt8Type, u8);
686impl_array_list_native_type!(arrow::array::types::UInt16Type, u16);
687impl_array_list_native_type!(arrow::array::types::UInt32Type, u32);
688impl_array_list_native_type!(arrow::array::types::UInt64Type, u64);
689impl_array_list_native_type!(arrow::array::types::Int8Type, i8);
691impl_array_list_native_type!(arrow::array::types::Int16Type, i16);
692impl_array_list_native_type!(arrow::array::types::Int32Type, i32);
693impl_array_list_native_type!(arrow::array::types::Int64Type, i64);
694impl_array_list_native_type!(arrow::array::types::Float16Type, half::f16);
696impl_array_list_native_type!(arrow::array::types::Float32Type, f32);
697impl_array_list_native_type!(arrow::array::types::Float64Type, f64);
698
699impl ChunkComponentSlicer for String {
700 type Item<'a> = Vec<ArrowString>;
701
702 fn slice<'a>(
703 component: ComponentIdentifier,
704 array: &'a dyn ArrowArray,
705 component_spans: impl Iterator<Item = Span<usize>> + 'a,
706 ) -> impl Iterator<Item = Vec<ArrowString>> {
707 let Some(utf8_array) = array.downcast_array_ref::<ArrowStringArray>() else {
708 error_on_downcast_failure(component, "ArrowStringArray", array.data_type());
709 return Either::Left(std::iter::empty());
710 };
711
712 let values = utf8_array.values().clone();
713 let offsets = utf8_array.offsets().clone();
714 let lengths = offsets.lengths().collect_vec();
715
716 Either::Right(component_spans.map(move |range| {
718 let offsets = &offsets[range.range()];
719 let lengths = &lengths[range.range()];
720 izip!(offsets, lengths)
721 .map(|(&idx, &len)| ArrowString::from(values.slice_with_length(idx as _, len)))
722 .collect_vec()
723 }))
724 }
725}
726
727impl ChunkComponentSlicer for bool {
728 type Item<'a> = ArrowBooleanBuffer;
729
730 fn slice<'a>(
731 component: ComponentIdentifier,
732 array: &'a dyn ArrowArray,
733 component_spans: impl Iterator<Item = Span<usize>> + 'a,
734 ) -> impl Iterator<Item = Self::Item<'a>> {
735 let Some(values) = array.downcast_array_ref::<ArrowBooleanArray>() else {
736 error_on_downcast_failure(component, "ArrowBooleanArray", array.data_type());
737 return Either::Left(std::iter::empty());
738 };
739 let values = values.values().clone();
740
741 Either::Right(
743 component_spans.map(move |Span { start, len }| values.clone().slice(start, len)),
744 )
745 }
746}
747
748pub struct ChunkIndicesIter {
751 chunk: Arc<Chunk>,
752
753 time_column: Option<TimeColumn>,
754 index: usize,
755}
756
757impl Iterator for ChunkIndicesIter {
758 type Item = (TimeInt, RowId);
759
760 fn next(&mut self) -> Option<Self::Item> {
761 let i = self.index;
762 self.index += 1;
763
764 let row_id = *self.chunk.row_ids_slice().get(i)?;
765
766 if let Some(time_column) = &self.time_column {
767 let time = *time_column.times_raw().get(i)?;
768 let time = TimeInt::new_temporal(time);
769 Some((time, row_id))
770 } else {
771 Some((TimeInt::STATIC, row_id))
772 }
773 }
774}
775
776impl Chunk {
777 #[inline]
786 pub fn iter_indices_owned(
787 self: Arc<Self>,
788 timeline: &TimelineName,
789 ) -> impl Iterator<Item = (TimeInt, RowId)> + use<> {
790 if self.is_static() {
791 Either::Left(ChunkIndicesIter {
792 chunk: self,
793 time_column: None,
794 index: 0,
795 })
796 } else {
797 self.timelines.get(timeline).cloned().map_or_else(
798 || Either::Right(Either::Left(std::iter::empty())),
799 |time_column| {
800 Either::Right(Either::Right(ChunkIndicesIter {
801 chunk: self,
802 time_column: Some(time_column),
803 index: 0,
804 }))
805 },
806 )
807 }
808 }
809}
810
811pub struct ChunkComponentIter<C, IO> {
815 values: Arc<Vec<C>>,
816 offsets: IO,
817}
818
819#[derive(Clone, PartialEq)]
826pub struct ChunkComponentIterItem<C> {
827 values: Arc<Vec<C>>,
828 span: Span<usize>,
829}
830
831impl<C: PartialEq> PartialEq<[C]> for ChunkComponentIterItem<C> {
832 fn eq(&self, rhs: &[C]) -> bool {
833 self.as_slice().eq(rhs)
834 }
835}
836
837impl<C: PartialEq> PartialEq<Vec<C>> for ChunkComponentIterItem<C> {
838 fn eq(&self, rhs: &Vec<C>) -> bool {
839 self.as_slice().eq(rhs)
840 }
841}
842
843impl<C: Eq> Eq for ChunkComponentIterItem<C> {}
844
845impl<C> Default for ChunkComponentIterItem<C> {
847 #[inline]
848 fn default() -> Self {
849 Self {
850 values: Arc::new(Vec::new()),
851 span: Span::default(),
852 }
853 }
854}
855
856impl<C> ChunkComponentIterItem<C> {
857 #[inline]
858 pub fn as_slice(&self) -> &[C] {
859 &self.values[self.span.range()]
860 }
861}
862
863impl<C> std::ops::Deref for ChunkComponentIterItem<C> {
864 type Target = [C];
865
866 #[inline]
867 fn deref(&self) -> &Self::Target {
868 self.as_slice()
869 }
870}
871
872impl<C: Component, IO: Iterator<Item = Span<usize>>> Iterator for ChunkComponentIter<C, IO> {
873 type Item = ChunkComponentIterItem<C>;
874
875 #[inline]
876 fn next(&mut self) -> Option<Self::Item> {
877 self.offsets.next().map(move |span| ChunkComponentIterItem {
878 values: Arc::clone(&self.values),
879 span,
880 })
881 }
882}
883
884impl Chunk {
885 #[inline]
900 pub fn iter_component<C: Component>(
901 &self,
902 component: ComponentIdentifier,
903 ) -> ChunkComponentIter<C, impl Iterator<Item = Span<usize>> + '_ + use<'_, C>> {
904 let Some(list_array) = self.components.get_array(component) else {
905 return ChunkComponentIter {
906 values: Arc::new(vec![]),
907 offsets: Either::Left(std::iter::empty()),
908 };
909 };
910
911 let values = arrow::array::ArrayRef::from(list_array.values().clone());
912 let values = match C::from_arrow(&values) {
913 Ok(values) => values,
914 Err(err) => {
915 re_log::debug_panic!(
916 "deserialization failed for {}, data discarded: {}",
917 C::name(),
918 re_error::format_ref(&err),
919 );
920
921 re_log::error_once!(
922 "deserialization failed for {}, data discarded: {}",
923 C::name(),
924 re_error::format_ref(&err),
925 );
926
927 return ChunkComponentIter {
928 values: Arc::new(vec![]),
929 offsets: Either::Left(std::iter::empty()),
930 };
931 }
932 };
933
934 ChunkComponentIter {
936 values: Arc::new(values),
937 offsets: Either::Right(self.iter_component_offsets(component)),
938 }
939 }
940}
941
942#[cfg(test)]
945mod tests {
946 use std::sync::Arc;
947
948 use itertools::{Itertools as _, izip};
949 use re_log_types::example_components::{MyPoint, MyPoints};
950 use re_log_types::{EntityPath, TimeInt, TimePoint};
951
952 use crate::{Chunk, RowId, Timeline};
953
954 #[test]
955 fn iter_indices_temporal() -> anyhow::Result<()> {
956 let entity_path = EntityPath::from("this/that");
957
958 let row_id1 = RowId::new();
959 let row_id2 = RowId::new();
960 let row_id3 = RowId::new();
961 let row_id4 = RowId::new();
962 let row_id5 = RowId::new();
963
964 let timeline_frame = Timeline::new_sequence("frame");
965
966 let timepoint1 = [(timeline_frame, 1)];
967 let timepoint2 = [(timeline_frame, 3)];
968 let timepoint3 = [(timeline_frame, 5)];
969 let timepoint4 = [(timeline_frame, 7)];
970 let timepoint5 = [(timeline_frame, 9)];
971
972 let points1 = &[MyPoint::new(1.0, 1.0)];
973 let points2 = &[MyPoint::new(2.0, 2.0)];
974 let points3 = &[MyPoint::new(3.0, 3.0)];
975 let points4 = &[MyPoint::new(4.0, 4.0)];
976 let points5 = &[MyPoint::new(5.0, 5.0)];
977
978 let chunk = Arc::new(
979 Chunk::builder(entity_path.clone())
980 .with_component_batches(
981 row_id1,
982 timepoint1,
983 [(MyPoints::descriptor_points(), points1 as _)],
984 )
985 .with_component_batches(
986 row_id2,
987 timepoint2,
988 [(MyPoints::descriptor_points(), points2 as _)],
989 )
990 .with_component_batches(
991 row_id3,
992 timepoint3,
993 [(MyPoints::descriptor_points(), points3 as _)],
994 )
995 .with_component_batches(
996 row_id4,
997 timepoint4,
998 [(MyPoints::descriptor_points(), points4 as _)],
999 )
1000 .with_component_batches(
1001 row_id5,
1002 timepoint5,
1003 [(MyPoints::descriptor_points(), points5 as _)],
1004 )
1005 .build()?,
1006 );
1007
1008 {
1009 let got = Arc::clone(&chunk)
1010 .iter_indices_owned(timeline_frame.name())
1011 .collect_vec();
1012 let expected = izip!(
1013 chunk
1014 .timelines
1015 .get(timeline_frame.name())
1016 .map(|time_column| time_column.times().collect_vec())
1017 .unwrap_or_default(),
1018 chunk.row_ids()
1019 )
1020 .collect_vec();
1021
1022 similar_asserts::assert_eq!(expected, got);
1023 }
1024
1025 Ok(())
1026 }
1027
1028 #[test]
1029 fn iter_indices_static() -> anyhow::Result<()> {
1030 let entity_path = EntityPath::from("this/that");
1031
1032 let row_id1 = RowId::new();
1033 let row_id2 = RowId::new();
1034 let row_id3 = RowId::new();
1035 let row_id4 = RowId::new();
1036 let row_id5 = RowId::new();
1037
1038 let timeline_frame = Timeline::new_sequence("frame");
1039
1040 let points1 = &[MyPoint::new(1.0, 1.0)];
1041 let points2 = &[MyPoint::new(2.0, 2.0)];
1042 let points3 = &[MyPoint::new(3.0, 3.0)];
1043 let points4 = &[MyPoint::new(4.0, 4.0)];
1044 let points5 = &[MyPoint::new(5.0, 5.0)];
1045
1046 let chunk = Arc::new(
1047 Chunk::builder(entity_path.clone())
1048 .with_component_batches(
1049 row_id1,
1050 TimePoint::default(),
1051 [(MyPoints::descriptor_points(), points1 as _)],
1052 )
1053 .with_component_batches(
1054 row_id2,
1055 TimePoint::default(),
1056 [(MyPoints::descriptor_points(), points2 as _)],
1057 )
1058 .with_component_batches(
1059 row_id3,
1060 TimePoint::default(),
1061 [(MyPoints::descriptor_points(), points3 as _)],
1062 )
1063 .with_component_batches(
1064 row_id4,
1065 TimePoint::default(),
1066 [(MyPoints::descriptor_points(), points4 as _)],
1067 )
1068 .with_component_batches(
1069 row_id5,
1070 TimePoint::default(),
1071 [(MyPoints::descriptor_points(), points5 as _)],
1072 )
1073 .build()?,
1074 );
1075
1076 {
1077 let got = Arc::clone(&chunk)
1078 .iter_indices_owned(timeline_frame.name())
1079 .collect_vec();
1080 let expected = izip!(std::iter::repeat(TimeInt::STATIC), chunk.row_ids()).collect_vec();
1081
1082 similar_asserts::assert_eq!(expected, got);
1083 }
1084
1085 Ok(())
1086 }
1087}