1use std::sync::Arc;
2
3use arrow::{
4 array::{
5 Array as ArrowArray, ArrayRef as ArrowArrayRef, ArrowPrimitiveType, BinaryArray,
6 BooleanArray as ArrowBooleanArray, FixedSizeListArray as ArrowFixedSizeListArray,
7 LargeBinaryArray, ListArray as ArrowListArray, PrimitiveArray as ArrowPrimitiveArray,
8 StringArray as ArrowStringArray, StructArray as ArrowStructArray,
9 },
10 buffer::{BooleanBuffer as ArrowBooleanBuffer, Buffer, ScalarBuffer as ArrowScalarBuffer},
11 datatypes::ArrowNativeType,
12};
13use itertools::{Either, Itertools as _, izip};
14
15use re_arrow_util::ArrowArrayDowncastRef as _;
16use re_log_types::{TimeInt, TimePoint, TimelineName};
17use re_span::Span;
18use re_types_core::{ArrowString, Component, ComponentDescriptor};
19
20use crate::{Chunk, RowId, TimeColumn};
21
22impl Chunk {
32 pub fn raw_component_array(
36 &self,
37 component_descr: &ComponentDescriptor,
38 ) -> Option<&ArrowArrayRef> {
39 self.components
40 .get(component_descr)
41 .map(|list_array| list_array.values())
42 }
43
44 #[inline]
52 pub fn iter_indices(
53 &self,
54 timeline: &TimelineName,
55 ) -> impl Iterator<Item = (TimeInt, RowId)> + '_ + use<'_> {
56 if self.is_static() {
57 Either::Right(Either::Left(izip!(
58 std::iter::repeat(TimeInt::STATIC),
59 self.row_ids()
60 )))
61 } else {
62 let Some(time_column) = self.timelines.get(timeline) else {
63 return Either::Left(std::iter::empty());
64 };
65
66 Either::Right(Either::Right(izip!(time_column.times(), self.row_ids())))
67 }
68 }
69
70 pub fn iter_component_indices(
80 &self,
81 timeline: &TimelineName,
82 component_descr: &ComponentDescriptor,
83 ) -> impl Iterator<Item = (TimeInt, RowId)> + '_ + use<'_> {
84 let Some(list_array) = self.components.get(component_descr) else {
85 return Either::Left(std::iter::empty());
86 };
87
88 if self.is_static() {
89 let indices = izip!(std::iter::repeat(TimeInt::STATIC), self.row_ids());
90
91 if let Some(validity) = list_array.nulls() {
92 Either::Right(Either::Left(Either::Left(
93 indices
94 .enumerate()
95 .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
96 )))
97 } else {
98 Either::Right(Either::Left(Either::Right(indices)))
99 }
100 } else {
101 let Some(time_column) = self.timelines.get(timeline) else {
102 return Either::Left(std::iter::empty());
103 };
104
105 let indices = izip!(time_column.times(), self.row_ids());
106
107 if let Some(validity) = list_array.nulls() {
108 Either::Right(Either::Right(Either::Left(
109 indices
110 .enumerate()
111 .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
112 )))
113 } else {
114 Either::Right(Either::Right(Either::Right(indices)))
115 }
116 }
117 }
118
119 #[inline]
124 pub fn iter_timepoints(&self) -> impl Iterator<Item = TimePoint> + '_ {
125 let mut timelines = self
126 .timelines
127 .values()
128 .map(|time_column| (time_column.timeline, time_column.times()))
129 .collect_vec();
130
131 std::iter::from_fn(move || {
132 let mut timepoint = TimePoint::default();
133 for (timeline, times) in &mut timelines {
134 timepoint.insert(*timeline, times.next()?);
135 }
136 Some(timepoint)
137 })
138 }
139
140 pub fn iter_component_timepoints(
147 &self,
148 component_descr: &ComponentDescriptor,
149 ) -> impl Iterator<Item = TimePoint> + '_ + use<'_> {
150 let Some(list_array) = self.components.get(component_descr) else {
151 return Either::Left(std::iter::empty());
152 };
153
154 if let Some(validity) = list_array.nulls() {
155 let mut timelines = self
156 .timelines
157 .values()
158 .map(|time_column| {
159 (
160 time_column.timeline,
161 time_column
162 .times()
163 .enumerate()
164 .filter(|(i, _)| validity.is_valid(*i))
165 .map(|(_, time)| time),
166 )
167 })
168 .collect_vec();
169
170 Either::Right(Either::Left(std::iter::from_fn(move || {
171 let mut timepoint = TimePoint::default();
172 for (timeline, times) in &mut timelines {
173 timepoint.insert(*timeline, times.next()?);
174 }
175 Some(timepoint)
176 })))
177 } else {
178 let mut timelines = self
179 .timelines
180 .values()
181 .map(|time_column| (time_column.timeline, time_column.times()))
182 .collect_vec();
183
184 Either::Right(Either::Right(std::iter::from_fn(move || {
185 let mut timepoint = TimePoint::default();
186 for (timeline, times) in &mut timelines {
187 timepoint.insert(*timeline, times.next()?);
188 }
189 Some(timepoint)
190 })))
191 }
192 }
193
194 pub fn iter_component_offsets<'a>(
200 &'a self,
201 component_descriptor: &ComponentDescriptor,
202 ) -> impl Iterator<Item = Span<usize>> + 'a + use<'a> {
203 let Some(list_array) = self.components.get(component_descriptor) else {
204 return Either::Left(std::iter::empty());
205 };
206
207 let offsets = list_array.offsets().iter().map(|idx| *idx as usize);
208 let lengths = list_array.offsets().lengths();
209
210 if let Some(validity) = list_array.nulls() {
211 Either::Right(Either::Left(
212 izip!(offsets, lengths)
213 .enumerate()
214 .filter_map(|(i, o)| validity.is_valid(i).then_some(o))
215 .map(|(start, len)| Span { start, len }),
216 ))
217 } else {
218 Either::Right(Either::Right(
219 izip!(offsets, lengths).map(|(start, len)| Span { start, len }),
220 ))
221 }
222 }
223
224 #[inline]
235 pub fn iter_slices<'a, S: 'a + ChunkComponentSlicer>(
236 &'a self,
237 component_descriptor: ComponentDescriptor,
238 ) -> impl Iterator<Item = S::Item<'a>> + 'a + use<'a, S> {
239 let Some(list_array) = self.components.get(&component_descriptor) else {
240 return Either::Left(std::iter::empty());
241 };
242
243 let component_offset_values = self.iter_component_offsets(&component_descriptor);
244
245 Either::Right(S::slice(
246 component_descriptor,
247 &**list_array.values() as _,
248 component_offset_values,
249 ))
250 }
251
252 pub fn iter_slices_from_struct_field<'a, S: 'a + ChunkComponentSlicer>(
265 &'a self,
266 component_descriptor: ComponentDescriptor,
267 field_name: &'a str,
268 ) -> impl Iterator<Item = S::Item<'a>> + 'a {
269 let Some(list_array) = self.components.get(&component_descriptor) else {
270 return Either::Left(std::iter::empty());
271 };
272
273 let Some(struct_array) = list_array.values().downcast_array_ref::<ArrowStructArray>()
274 else {
275 if cfg!(debug_assertions) {
276 panic!("downcast failed for {component_descriptor}, data discarded");
277 } else {
278 re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
279 }
280 return Either::Left(std::iter::empty());
281 };
282
283 let Some(field_idx) = struct_array
284 .fields()
285 .iter()
286 .enumerate()
287 .find_map(|(i, field)| (field.name() == field_name).then_some(i))
288 else {
289 if cfg!(debug_assertions) {
290 panic!("field {field_name} not found for {component_descriptor}, data discarded");
291 } else {
292 re_log::error_once!(
293 "field {field_name} not found for {component_descriptor}, data discarded"
294 );
295 }
296 return Either::Left(std::iter::empty());
297 };
298
299 if field_idx >= struct_array.num_columns() {
300 if cfg!(debug_assertions) {
301 panic!("field {field_name} not found for {component_descriptor}, data discarded");
302 } else {
303 re_log::error_once!(
304 "field {field_name} not found for {component_descriptor}, data discarded"
305 );
306 return Either::Left(std::iter::empty());
307 }
308 }
309
310 let component_offset_values = self.iter_component_offsets(&component_descriptor);
311
312 Either::Right(S::slice(
313 component_descriptor,
314 struct_array.column(field_idx),
315 component_offset_values,
316 ))
317 }
318}
319
320pub trait ChunkComponentSlicer {
326 type Item<'a>;
327
328 fn slice<'a>(
329 component_descriptor: ComponentDescriptor,
332 array: &'a dyn ArrowArray,
333 component_spans: impl Iterator<Item = Span<usize>> + 'a,
334 ) -> impl Iterator<Item = Self::Item<'a>> + 'a;
335}
336
337#[expect(clippy::needless_pass_by_value)] fn slice_as_native<'a, P, T>(
340 component_descriptor: ComponentDescriptor,
341 array: &'a dyn ArrowArray,
342 component_spans: impl Iterator<Item = Span<usize>> + 'a,
343) -> impl Iterator<Item = &'a [T]> + 'a
344where
345 P: ArrowPrimitiveType<Native = T>,
346 T: ArrowNativeType,
347{
348 let Some(values) = array.downcast_array_ref::<ArrowPrimitiveArray<P>>() else {
349 if cfg!(debug_assertions) {
350 panic!("downcast failed for {component_descriptor}, data discarded");
351 } else {
352 re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
353 }
354 return Either::Left(std::iter::empty());
355 };
356 let values = values.values().as_ref();
357
358 Either::Right(component_spans.map(move |range| &values[range.range()]))
360}
361
362macro_rules! impl_native_type {
364 ($arrow_primitive_type:ty, $native_type:ty) => {
365 impl ChunkComponentSlicer for $native_type {
366 type Item<'a> = &'a [$native_type];
367
368 fn slice<'a>(
369 component_descriptor: ComponentDescriptor,
370 array: &'a dyn ArrowArray,
371 component_spans: impl Iterator<Item = Span<usize>> + 'a,
372 ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
373 slice_as_native::<$arrow_primitive_type, $native_type>(
374 component_descriptor,
375 array,
376 component_spans,
377 )
378 }
379 }
380 };
381}
382
383impl_native_type!(arrow::array::types::UInt8Type, u8);
384impl_native_type!(arrow::array::types::UInt16Type, u16);
385impl_native_type!(arrow::array::types::UInt32Type, u32);
386impl_native_type!(arrow::array::types::UInt64Type, u64);
387impl_native_type!(arrow::array::types::Int8Type, i8);
389impl_native_type!(arrow::array::types::Int16Type, i16);
390impl_native_type!(arrow::array::types::Int32Type, i32);
391impl_native_type!(arrow::array::types::Int64Type, i64);
392impl_native_type!(arrow::array::types::Float16Type, half::f16);
394impl_native_type!(arrow::array::types::Float32Type, f32);
395impl_native_type!(arrow::array::types::Float64Type, f64);
396
397#[expect(clippy::needless_pass_by_value)] fn slice_as_array_native<'a, const N: usize, P, T>(
400 component_descriptor: ComponentDescriptor,
401 array: &'a dyn ArrowArray,
402 component_spans: impl Iterator<Item = Span<usize>> + 'a,
403) -> impl Iterator<Item = &'a [[T; N]]> + 'a
404where
405 [T; N]: bytemuck::Pod,
406 P: ArrowPrimitiveType<Native = T>,
407 T: ArrowNativeType + bytemuck::Pod,
408{
409 let Some(fixed_size_list_array) = array.downcast_array_ref::<ArrowFixedSizeListArray>() else {
410 if cfg!(debug_assertions) {
411 panic!("downcast failed for {component_descriptor}, data discarded");
412 } else {
413 re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
414 }
415 return Either::Left(std::iter::empty());
416 };
417
418 let Some(values) = fixed_size_list_array
419 .values()
420 .downcast_array_ref::<ArrowPrimitiveArray<P>>()
421 else {
422 if cfg!(debug_assertions) {
423 panic!("downcast failed for {component_descriptor}, data discarded");
424 } else {
425 re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
426 }
427 return Either::Left(std::iter::empty());
428 };
429
430 let size = fixed_size_list_array.value_length() as usize;
431 let values = values.values().as_ref();
432
433 Either::Right(
435 component_spans.map(move |span| bytemuck::cast_slice(&values[(span * size).range()])),
436 )
437}
438
439macro_rules! impl_array_native_type {
441 ($arrow_primitive_type:ty, $native_type:ty) => {
442 impl<const N: usize> ChunkComponentSlicer for [$native_type; N]
443 where
444 [$native_type; N]: bytemuck::Pod,
445 {
446 type Item<'a> = &'a [[$native_type; N]];
447
448 fn slice<'a>(
449 component_descriptor: ComponentDescriptor,
450 array: &'a dyn ArrowArray,
451 component_spans: impl Iterator<Item = Span<usize>> + 'a,
452 ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
453 slice_as_array_native::<N, $arrow_primitive_type, $native_type>(
454 component_descriptor,
455 array,
456 component_spans,
457 )
458 }
459 }
460 };
461}
462
463impl_array_native_type!(arrow::array::types::UInt8Type, u8);
464impl_array_native_type!(arrow::array::types::UInt16Type, u16);
465impl_array_native_type!(arrow::array::types::UInt32Type, u32);
466impl_array_native_type!(arrow::array::types::UInt64Type, u64);
467impl_array_native_type!(arrow::array::types::Int8Type, i8);
469impl_array_native_type!(arrow::array::types::Int16Type, i16);
470impl_array_native_type!(arrow::array::types::Int32Type, i32);
471impl_array_native_type!(arrow::array::types::Int64Type, i64);
472impl_array_native_type!(arrow::array::types::Float16Type, half::f16);
474impl_array_native_type!(arrow::array::types::Float32Type, f32);
475impl_array_native_type!(arrow::array::types::Float64Type, f64);
476
477#[expect(clippy::needless_pass_by_value)] fn slice_as_buffer_native<'a, P, T>(
480 component_descriptor: ComponentDescriptor,
481 array: &'a dyn ArrowArray,
482 component_spans: impl Iterator<Item = Span<usize>> + 'a,
483) -> impl Iterator<Item = Vec<ArrowScalarBuffer<T>>> + 'a
484where
485 P: ArrowPrimitiveType<Native = T>,
486 T: ArrowNativeType,
487{
488 let Some(inner_list_array) = array.downcast_array_ref::<ArrowListArray>() else {
489 if cfg!(debug_assertions) {
490 panic!(
491 "DEBUG BUILD: {component_descriptor} had unexpected datatype: {:?}",
492 array.data_type()
493 );
494 } else {
495 re_log::error_once!(
496 "{component_descriptor} had unexpected datatype: {:?}. Data discarded",
497 array.data_type()
498 );
499 return Either::Left(std::iter::empty());
500 }
501 };
502
503 let Some(values) = inner_list_array
504 .values()
505 .downcast_array_ref::<ArrowPrimitiveArray<P>>()
506 else {
507 if cfg!(debug_assertions) {
508 panic!(
509 "DEBUG BUILD: {component_descriptor} had unexpected datatype: {:?}",
510 array.data_type()
511 );
512 } else {
513 re_log::error_once!(
514 "{component_descriptor} had unexpected datatype: {:?}. Data discarded",
515 array.data_type()
516 );
517 return Either::Left(std::iter::empty());
518 }
519 };
520
521 let values = values.values();
522 let offsets = inner_list_array.offsets();
523 let lengths = offsets.lengths().collect_vec();
524
525 Either::Right(component_spans.map(move |span| {
527 let offsets = &offsets[span.range()];
528 let lengths = &lengths[span.range()];
529 izip!(offsets, lengths)
530 .map(|(&idx, &len)| values.clone().slice(idx as _, len))
532 .collect_vec()
533 }))
534}
535
536fn slice_as_u8<'a>(
538 component_descriptor: ComponentDescriptor,
539 array: &'a dyn ArrowArray,
540 component_spans: impl Iterator<Item = Span<usize>> + 'a,
541) -> impl Iterator<Item = Vec<Buffer>> + 'a {
542 if let Some(binary_array) = array.downcast_array_ref::<BinaryArray>() {
543 let values = binary_array.values();
544 let offsets = binary_array.offsets();
545 let lengths = offsets.lengths().collect_vec();
546
547 Either::Left(Either::Left(component_spans.map(move |span| {
549 let offsets = &offsets[span.range()];
550 let lengths = &lengths[span.range()];
551 izip!(offsets, lengths)
552 .map(|(&idx, &len)| values.clone().slice_with_length(idx as _, len))
554 .collect_vec()
555 })))
556 } else if let Some(binary_array) = array.downcast_array_ref::<LargeBinaryArray>() {
557 let values = binary_array.values();
558 let offsets = binary_array.offsets();
559 let lengths = offsets.lengths().collect_vec();
560
561 Either::Left(Either::Right(component_spans.map(move |span| {
563 let offsets = &offsets[span.range()];
564 let lengths = &lengths[span.range()];
565 izip!(offsets, lengths)
566 .map(|(&idx, &len)| values.clone().slice_with_length(idx as _, len))
568 .collect_vec()
569 })))
570 } else {
571 Either::Right(
572 slice_as_buffer_native::<arrow::array::types::UInt8Type, u8>(
573 component_descriptor,
574 array,
575 component_spans,
576 )
577 .map(|scalar_buffers| {
578 scalar_buffers
579 .into_iter()
580 .map(|scalar_buffer| scalar_buffer.into_inner())
581 .collect_vec()
582 }),
583 )
584 }
585}
586
587macro_rules! impl_buffer_native_type {
589 ($primitive_type:ty, $native_type:ty) => {
590 impl ChunkComponentSlicer for &[$native_type] {
591 type Item<'a> = Vec<ArrowScalarBuffer<$native_type>>;
592
593 fn slice<'a>(
594 component_descriptor: ComponentDescriptor,
595 array: &'a dyn ArrowArray,
596 component_spans: impl Iterator<Item = Span<usize>> + 'a,
597 ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
598 slice_as_buffer_native::<$primitive_type, $native_type>(
599 component_descriptor,
600 array,
601 component_spans,
602 )
603 }
604 }
605 };
606}
607
608impl ChunkComponentSlicer for &[u8] {
610 type Item<'a> = Vec<Buffer>;
611
612 fn slice<'a>(
613 component_descriptor: ComponentDescriptor,
614 array: &'a dyn ArrowArray,
615 component_spans: impl Iterator<Item = Span<usize>> + 'a,
616 ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
617 slice_as_u8(component_descriptor, array, component_spans)
618 }
619}
620
621impl_buffer_native_type!(arrow::array::types::UInt16Type, u16);
622impl_buffer_native_type!(arrow::array::types::UInt32Type, u32);
623impl_buffer_native_type!(arrow::array::types::UInt64Type, u64);
624impl_buffer_native_type!(arrow::array::types::Int8Type, i8);
626impl_buffer_native_type!(arrow::array::types::Int16Type, i16);
627impl_buffer_native_type!(arrow::array::types::Int32Type, i32);
628impl_buffer_native_type!(arrow::array::types::Int64Type, i64);
629impl_buffer_native_type!(arrow::array::types::Float16Type, half::f16);
631impl_buffer_native_type!(arrow::array::types::Float32Type, f32);
632impl_buffer_native_type!(arrow::array::types::Float64Type, f64);
633
634#[expect(clippy::needless_pass_by_value)] fn slice_as_array_list_native<'a, const N: usize, P, T>(
637 component_descriptor: ComponentDescriptor,
638 array: &'a dyn ArrowArray,
639 component_spans: impl Iterator<Item = Span<usize>> + 'a,
640) -> impl Iterator<Item = Vec<&'a [[T; N]]>> + 'a
641where
642 [T; N]: bytemuck::Pod,
643 P: ArrowPrimitiveType<Native = T>,
644 T: ArrowNativeType + bytemuck::Pod,
645{
646 let Some(inner_list_array) = array.downcast_array_ref::<ArrowListArray>() else {
647 if cfg!(debug_assertions) {
648 panic!("downcast failed for {component_descriptor}, data discarded");
649 } else {
650 re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
651 }
652 return Either::Left(std::iter::empty());
653 };
654
655 let inner_offsets = inner_list_array.offsets();
656 let inner_lengths = inner_offsets.lengths().collect_vec();
657
658 let Some(fixed_size_list_array) = inner_list_array
659 .values()
660 .downcast_array_ref::<ArrowFixedSizeListArray>()
661 else {
662 if cfg!(debug_assertions) {
663 panic!("downcast failed for {component_descriptor}, data discarded");
664 } else {
665 re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
666 }
667 return Either::Left(std::iter::empty());
668 };
669
670 let Some(values) = fixed_size_list_array
671 .values()
672 .downcast_array_ref::<ArrowPrimitiveArray<P>>()
673 else {
674 if cfg!(debug_assertions) {
675 panic!("downcast failed for {component_descriptor}, data discarded");
676 } else {
677 re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
678 }
679 return Either::Left(std::iter::empty());
680 };
681
682 let size = fixed_size_list_array.value_length() as usize;
683 let values = values.values();
684
685 Either::Right(component_spans.map(move |span| {
687 let inner_offsets = &inner_offsets[span.range()];
688 let inner_lengths = &inner_lengths[span.range()];
689 izip!(inner_offsets, inner_lengths)
690 .map(|(&idx, &len)| {
691 let idx = idx as usize;
692 bytemuck::cast_slice(&values[idx * size..idx * size + len * size])
693 })
694 .collect_vec()
695 }))
696}
697
698macro_rules! impl_array_list_native_type {
700 ($primitive_type:ty, $native_type:ty) => {
701 impl<const N: usize> ChunkComponentSlicer for &[[$native_type; N]]
702 where
703 [$native_type; N]: bytemuck::Pod,
704 {
705 type Item<'a> = Vec<&'a [[$native_type; N]]>;
706
707 fn slice<'a>(
708 component_descriptor: ComponentDescriptor,
709 array: &'a dyn ArrowArray,
710 component_spans: impl Iterator<Item = Span<usize>> + 'a,
711 ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
712 slice_as_array_list_native::<N, $primitive_type, $native_type>(
713 component_descriptor,
714 array,
715 component_spans,
716 )
717 }
718 }
719 };
720}
721
722impl_array_list_native_type!(arrow::array::types::UInt8Type, u8);
723impl_array_list_native_type!(arrow::array::types::UInt16Type, u16);
724impl_array_list_native_type!(arrow::array::types::UInt32Type, u32);
725impl_array_list_native_type!(arrow::array::types::UInt64Type, u64);
726impl_array_list_native_type!(arrow::array::types::Int8Type, i8);
728impl_array_list_native_type!(arrow::array::types::Int16Type, i16);
729impl_array_list_native_type!(arrow::array::types::Int32Type, i32);
730impl_array_list_native_type!(arrow::array::types::Int64Type, i64);
731impl_array_list_native_type!(arrow::array::types::Float16Type, half::f16);
733impl_array_list_native_type!(arrow::array::types::Float32Type, f32);
734impl_array_list_native_type!(arrow::array::types::Float64Type, f64);
735
736impl ChunkComponentSlicer for String {
737 type Item<'a> = Vec<ArrowString>;
738
739 fn slice<'a>(
740 component_descriptor: ComponentDescriptor,
741 array: &'a dyn ArrowArray,
742 component_spans: impl Iterator<Item = Span<usize>> + 'a,
743 ) -> impl Iterator<Item = Vec<ArrowString>> + 'a {
744 let Some(utf8_array) = array.downcast_array_ref::<ArrowStringArray>() else {
745 if cfg!(debug_assertions) {
746 panic!("downcast failed for {component_descriptor}, data discarded");
747 } else {
748 re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
749 }
750 return Either::Left(std::iter::empty());
751 };
752
753 let values = utf8_array.values().clone();
754 let offsets = utf8_array.offsets().clone();
755 let lengths = offsets.lengths().collect_vec();
756
757 Either::Right(component_spans.map(move |range| {
759 let offsets = &offsets[range.range()];
760 let lengths = &lengths[range.range()];
761 izip!(offsets, lengths)
762 .map(|(&idx, &len)| ArrowString::from(values.slice_with_length(idx as _, len)))
763 .collect_vec()
764 }))
765 }
766}
767
768impl ChunkComponentSlicer for bool {
769 type Item<'a> = ArrowBooleanBuffer;
770
771 fn slice<'a>(
772 component_descriptor: ComponentDescriptor,
773 array: &'a dyn ArrowArray,
774 component_spans: impl Iterator<Item = Span<usize>> + 'a,
775 ) -> impl Iterator<Item = Self::Item<'a>> + 'a {
776 let Some(values) = array.downcast_array_ref::<ArrowBooleanArray>() else {
777 if cfg!(debug_assertions) {
778 panic!("downcast failed for {component_descriptor}, data discarded");
779 } else {
780 re_log::error_once!("downcast failed for {component_descriptor}, data discarded");
781 }
782 return Either::Left(std::iter::empty());
783 };
784 let values = values.values().clone();
785
786 Either::Right(
788 component_spans.map(move |Span { start, len }| values.clone().slice(start, len)),
789 )
790 }
791}
792
793pub struct ChunkIndicesIter {
796 chunk: Arc<Chunk>,
797
798 time_column: Option<TimeColumn>,
799 index: usize,
800}
801
802impl Iterator for ChunkIndicesIter {
803 type Item = (TimeInt, RowId);
804
805 fn next(&mut self) -> Option<Self::Item> {
806 let i = self.index;
807 self.index += 1;
808
809 let row_id = *self.chunk.row_ids_slice().get(i)?;
810
811 if let Some(time_column) = &self.time_column {
812 let time = *time_column.times_raw().get(i)?;
813 let time = TimeInt::new_temporal(time);
814 Some((time, row_id))
815 } else {
816 Some((TimeInt::STATIC, row_id))
817 }
818 }
819}
820
821impl Chunk {
822 #[inline]
831 pub fn iter_indices_owned(
832 self: Arc<Self>,
833 timeline: &TimelineName,
834 ) -> impl Iterator<Item = (TimeInt, RowId)> + use<> {
835 if self.is_static() {
836 Either::Left(ChunkIndicesIter {
837 chunk: self,
838 time_column: None,
839 index: 0,
840 })
841 } else {
842 self.timelines.get(timeline).cloned().map_or_else(
843 || Either::Right(Either::Left(std::iter::empty())),
844 |time_column| {
845 Either::Right(Either::Right(ChunkIndicesIter {
846 chunk: self,
847 time_column: Some(time_column),
848 index: 0,
849 }))
850 },
851 )
852 }
853 }
854}
855
856pub struct ChunkComponentIter<C, IO> {
860 values: Arc<Vec<C>>,
861 offsets: IO,
862}
863
864#[derive(Clone, PartialEq)]
871pub struct ChunkComponentIterItem<C> {
872 values: Arc<Vec<C>>,
873 span: Span<usize>,
874}
875
876impl<C: PartialEq> PartialEq<[C]> for ChunkComponentIterItem<C> {
877 fn eq(&self, rhs: &[C]) -> bool {
878 self.as_slice().eq(rhs)
879 }
880}
881
882impl<C: PartialEq> PartialEq<Vec<C>> for ChunkComponentIterItem<C> {
883 fn eq(&self, rhs: &Vec<C>) -> bool {
884 self.as_slice().eq(rhs)
885 }
886}
887
888impl<C: Eq> Eq for ChunkComponentIterItem<C> {}
889
890impl<C> Default for ChunkComponentIterItem<C> {
892 #[inline]
893 fn default() -> Self {
894 Self {
895 values: Arc::new(Vec::new()),
896 span: Span::default(),
897 }
898 }
899}
900
901impl<C> ChunkComponentIterItem<C> {
902 #[inline]
903 pub fn as_slice(&self) -> &[C] {
904 &self.values[self.span.range()]
905 }
906}
907
908impl<C> std::ops::Deref for ChunkComponentIterItem<C> {
909 type Target = [C];
910
911 #[inline]
912 fn deref(&self) -> &Self::Target {
913 self.as_slice()
914 }
915}
916
917impl<C: Component, IO: Iterator<Item = Span<usize>>> Iterator for ChunkComponentIter<C, IO> {
918 type Item = ChunkComponentIterItem<C>;
919
920 #[inline]
921 fn next(&mut self) -> Option<Self::Item> {
922 self.offsets.next().map(move |span| ChunkComponentIterItem {
923 values: Arc::clone(&self.values),
924 span,
925 })
926 }
927}
928
929impl Chunk {
930 #[inline]
945 pub fn iter_component<C: Component>(
946 &self,
947 component_descriptor: &ComponentDescriptor,
948 ) -> ChunkComponentIter<C, impl Iterator<Item = Span<usize>> + '_ + use<'_, C>> {
949 debug_assert_eq!(
950 component_descriptor.component_type,
951 Some(C::name()),
952 "component type mismatch"
953 );
954
955 let Some(list_array) = self.components.get(component_descriptor) else {
956 return ChunkComponentIter {
957 values: Arc::new(vec![]),
958 offsets: Either::Left(std::iter::empty()),
959 };
960 };
961
962 let values = arrow::array::ArrayRef::from(list_array.values().clone());
963 let values = match C::from_arrow(&values) {
964 Ok(values) => values,
965 Err(err) => {
966 if cfg!(debug_assertions) {
967 panic!(
968 "[DEBUG-ONLY] deserialization failed for {}, data discarded: {}",
969 C::name(),
970 re_error::format_ref(&err),
971 );
972 } else {
973 re_log::error_once!(
974 "deserialization failed for {}, data discarded: {}",
975 C::name(),
976 re_error::format_ref(&err),
977 );
978 }
979 return ChunkComponentIter {
980 values: Arc::new(vec![]),
981 offsets: Either::Left(std::iter::empty()),
982 };
983 }
984 };
985
986 ChunkComponentIter {
988 values: Arc::new(values),
989 offsets: Either::Right(self.iter_component_offsets(component_descriptor)),
990 }
991 }
992}
993
994#[cfg(test)]
997mod tests {
998 use std::sync::Arc;
999
1000 use itertools::{Itertools as _, izip};
1001 use re_log_types::{
1002 EntityPath, TimeInt, TimePoint,
1003 example_components::{MyPoint, MyPoints},
1004 };
1005
1006 use crate::{Chunk, RowId, Timeline};
1007
1008 #[test]
1009 fn iter_indices_temporal() -> anyhow::Result<()> {
1010 let entity_path = EntityPath::from("this/that");
1011
1012 let row_id1 = RowId::new();
1013 let row_id2 = RowId::new();
1014 let row_id3 = RowId::new();
1015 let row_id4 = RowId::new();
1016 let row_id5 = RowId::new();
1017
1018 let timeline_frame = Timeline::new_sequence("frame");
1019
1020 let timepoint1 = [(timeline_frame, 1)];
1021 let timepoint2 = [(timeline_frame, 3)];
1022 let timepoint3 = [(timeline_frame, 5)];
1023 let timepoint4 = [(timeline_frame, 7)];
1024 let timepoint5 = [(timeline_frame, 9)];
1025
1026 let points1 = &[MyPoint::new(1.0, 1.0)];
1027 let points2 = &[MyPoint::new(2.0, 2.0)];
1028 let points3 = &[MyPoint::new(3.0, 3.0)];
1029 let points4 = &[MyPoint::new(4.0, 4.0)];
1030 let points5 = &[MyPoint::new(5.0, 5.0)];
1031
1032 let chunk = Arc::new(
1033 Chunk::builder(entity_path.clone())
1034 .with_component_batches(
1035 row_id1,
1036 timepoint1,
1037 [(MyPoints::descriptor_points(), points1 as _)],
1038 )
1039 .with_component_batches(
1040 row_id2,
1041 timepoint2,
1042 [(MyPoints::descriptor_points(), points2 as _)],
1043 )
1044 .with_component_batches(
1045 row_id3,
1046 timepoint3,
1047 [(MyPoints::descriptor_points(), points3 as _)],
1048 )
1049 .with_component_batches(
1050 row_id4,
1051 timepoint4,
1052 [(MyPoints::descriptor_points(), points4 as _)],
1053 )
1054 .with_component_batches(
1055 row_id5,
1056 timepoint5,
1057 [(MyPoints::descriptor_points(), points5 as _)],
1058 )
1059 .build()?,
1060 );
1061
1062 {
1063 let got = Arc::clone(&chunk)
1064 .iter_indices_owned(timeline_frame.name())
1065 .collect_vec();
1066 let expected = izip!(
1067 chunk
1068 .timelines
1069 .get(timeline_frame.name())
1070 .map(|time_column| time_column.times().collect_vec())
1071 .unwrap_or_default(),
1072 chunk.row_ids()
1073 )
1074 .collect_vec();
1075
1076 similar_asserts::assert_eq!(expected, got);
1077 }
1078
1079 Ok(())
1080 }
1081
1082 #[test]
1083 fn iter_indices_static() -> anyhow::Result<()> {
1084 let entity_path = EntityPath::from("this/that");
1085
1086 let row_id1 = RowId::new();
1087 let row_id2 = RowId::new();
1088 let row_id3 = RowId::new();
1089 let row_id4 = RowId::new();
1090 let row_id5 = RowId::new();
1091
1092 let timeline_frame = Timeline::new_sequence("frame");
1093
1094 let points1 = &[MyPoint::new(1.0, 1.0)];
1095 let points2 = &[MyPoint::new(2.0, 2.0)];
1096 let points3 = &[MyPoint::new(3.0, 3.0)];
1097 let points4 = &[MyPoint::new(4.0, 4.0)];
1098 let points5 = &[MyPoint::new(5.0, 5.0)];
1099
1100 let chunk = Arc::new(
1101 Chunk::builder(entity_path.clone())
1102 .with_component_batches(
1103 row_id1,
1104 TimePoint::default(),
1105 [(MyPoints::descriptor_points(), points1 as _)],
1106 )
1107 .with_component_batches(
1108 row_id2,
1109 TimePoint::default(),
1110 [(MyPoints::descriptor_points(), points2 as _)],
1111 )
1112 .with_component_batches(
1113 row_id3,
1114 TimePoint::default(),
1115 [(MyPoints::descriptor_points(), points3 as _)],
1116 )
1117 .with_component_batches(
1118 row_id4,
1119 TimePoint::default(),
1120 [(MyPoints::descriptor_points(), points4 as _)],
1121 )
1122 .with_component_batches(
1123 row_id5,
1124 TimePoint::default(),
1125 [(MyPoints::descriptor_points(), points5 as _)],
1126 )
1127 .build()?,
1128 );
1129
1130 {
1131 let got = Arc::clone(&chunk)
1132 .iter_indices_owned(timeline_frame.name())
1133 .collect_vec();
1134 let expected = izip!(std::iter::repeat(TimeInt::STATIC), chunk.row_ids()).collect_vec();
1135
1136 similar_asserts::assert_eq!(expected, got);
1137 }
1138
1139 Ok(())
1140 }
1141}