1use arrow::array::{
2 Array as _, ArrayRef as ArrowArrayRef, BooleanArray as ArrowBooleanArray,
3 ListArray as ArrowListArray,
4};
5use itertools::Itertools as _;
6use nohash_hasher::IntSet;
7
8use re_log_types::TimelineName;
9use re_types_core::ComponentDescriptor;
10
11use crate::{Chunk, RowId, TimeColumn};
12
13impl Chunk {
19 pub fn cell(
25 &self,
26 row_id: RowId,
27 component_desc: &ComponentDescriptor,
28 ) -> Option<ArrowArrayRef> {
29 let list_array = self.components.get(component_desc)?;
30
31 if self.is_sorted() {
32 let row_ids = self.row_ids_slice();
33 let index = row_ids.binary_search(&row_id).ok()?;
34 list_array.is_valid(index).then(|| list_array.value(index))
35 } else {
36 let (index, _) = self.row_ids().find_position(|id| *id == row_id)?;
37 list_array.is_valid(index).then(|| list_array.value(index))
38 }
39 }
40
41 #[must_use]
51 #[inline]
52 pub fn row_sliced(&self, index: usize, len: usize) -> Self {
53 re_tracing::profile_function!();
54
55 let Self {
56 id,
57 entity_path,
58 heap_size_bytes: _,
59 is_sorted,
60 row_ids,
61 timelines,
62 components,
63 } = self;
64
65 if index >= self.num_rows() {
69 return self.emptied();
70 }
71
72 let end_offset = usize::min(index.saturating_add(len), self.num_rows());
73 let len = end_offset.saturating_sub(index);
74
75 if len == 0 {
76 return self.emptied();
77 }
78
79 let is_sorted = *is_sorted || (len < 2);
80
81 let mut chunk = Self {
82 id: *id,
83 entity_path: entity_path.clone(),
84 heap_size_bytes: Default::default(),
85 is_sorted,
86 row_ids: row_ids.clone().slice(index, len),
87 timelines: timelines
88 .iter()
89 .map(|(timeline, time_column)| (*timeline, time_column.row_sliced(index, len)))
90 .collect(),
91 components: components
92 .iter()
93 .map(|(component_desc, list_array)| {
94 (component_desc.clone(), list_array.clone().slice(index, len))
95 })
96 .collect(),
97 };
98
99 chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
119
120 #[cfg(debug_assertions)]
121 #[allow(clippy::unwrap_used)] chunk.sanity_check().unwrap();
123
124 chunk
125 }
126
127 #[must_use]
137 #[inline]
138 pub fn timeline_sliced(&self, timeline: TimelineName) -> Self {
139 let Self {
140 id,
141 entity_path,
142 heap_size_bytes: _,
143 is_sorted,
144 row_ids,
145 timelines,
146 components,
147 } = self;
148
149 let chunk = Self {
150 id: *id,
151 entity_path: entity_path.clone(),
152 heap_size_bytes: Default::default(),
153 is_sorted: *is_sorted,
154 row_ids: row_ids.clone(),
155 timelines: timelines
156 .get_key_value(&timeline)
157 .map(|(timeline, time_column)| (*timeline, time_column.clone()))
158 .into_iter()
159 .collect(),
160 components: components.clone(),
161 };
162
163 #[cfg(debug_assertions)]
164 #[allow(clippy::unwrap_used)] chunk.sanity_check().unwrap();
166
167 chunk
168 }
169
170 #[must_use]
180 #[inline]
181 pub fn component_sliced(&self, component_descr: &ComponentDescriptor) -> Self {
182 let Self {
183 id,
184 entity_path,
185 heap_size_bytes: _,
186 is_sorted,
187 row_ids,
188 timelines,
189 components,
190 } = self;
191
192 let chunk = Self {
193 id: *id,
194 entity_path: entity_path.clone(),
195 heap_size_bytes: Default::default(),
196 is_sorted: *is_sorted,
197 row_ids: row_ids.clone(),
198 timelines: timelines.clone(),
199 components: crate::ChunkComponents(
200 components
201 .get(component_descr)
202 .map(|list_array| (component_descr.clone(), list_array.clone()))
203 .into_iter()
204 .collect(),
205 ),
206 };
207
208 #[cfg(debug_assertions)]
209 #[allow(clippy::unwrap_used)] chunk.sanity_check().unwrap();
211
212 chunk
213 }
214
215 #[must_use]
225 #[inline]
226 pub fn timelines_sliced(&self, timelines_to_keep: &IntSet<TimelineName>) -> Self {
227 let Self {
228 id,
229 entity_path,
230 heap_size_bytes: _,
231 is_sorted,
232 row_ids,
233 timelines,
234 components,
235 } = self;
236
237 let chunk = Self {
238 id: *id,
239 entity_path: entity_path.clone(),
240 heap_size_bytes: Default::default(),
241 is_sorted: *is_sorted,
242 row_ids: row_ids.clone(),
243 timelines: timelines
244 .iter()
245 .filter(|(timeline, _)| timelines_to_keep.contains(timeline))
246 .map(|(timeline, time_column)| (*timeline, time_column.clone()))
247 .collect(),
248 components: components.clone(),
249 };
250
251 #[cfg(debug_assertions)]
252 #[allow(clippy::unwrap_used)] chunk.sanity_check().unwrap();
254
255 chunk
256 }
257
258 #[must_use]
270 #[inline]
271 pub fn densified(&self, component_descr_pov: &ComponentDescriptor) -> Self {
272 let Self {
273 id,
274 entity_path,
275 heap_size_bytes: _,
276 is_sorted,
277 row_ids,
278 timelines,
279 components,
280 } = self;
281
282 if self.is_empty() {
283 return self.clone();
284 }
285
286 let Some(component_list_array) = self.components.get(component_descr_pov) else {
287 return self.clone();
288 };
289
290 let Some(validity) = component_list_array.nulls() else {
291 return self.clone();
292 };
293
294 re_tracing::profile_function!();
295
296 let mask = validity.iter().collect_vec();
297 let is_sorted = *is_sorted || (mask.iter().filter(|&&b| b).count() < 2);
298 let validity_filter = ArrowBooleanArray::from(mask);
299
300 let mut chunk = Self {
301 id: *id,
302 entity_path: entity_path.clone(),
303 heap_size_bytes: Default::default(),
304 is_sorted,
305 row_ids: re_arrow_util::filter_array(row_ids, &validity_filter),
306 timelines: timelines
307 .iter()
308 .map(|(&timeline, time_column)| (timeline, time_column.filtered(&validity_filter)))
309 .collect(),
310 components: components
311 .iter()
312 .map(|(component_desc, list_array)| {
313 let filtered = re_arrow_util::filter_array(list_array, &validity_filter);
314 let filtered = if component_desc == component_descr_pov {
315 let (field, offsets, values, _nulls) = filtered.into_parts();
320 ArrowListArray::new(field, offsets, values, None)
321 } else {
322 filtered
323 };
324
325 (component_desc.clone(), filtered)
326 })
327 .collect(),
328 };
329
330 chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
346
347 #[cfg(debug_assertions)]
348 #[allow(clippy::unwrap_used)] chunk.sanity_check().unwrap();
350
351 chunk
352 }
353
354 #[must_use]
360 #[inline]
361 pub fn emptied(&self) -> Self {
362 let Self {
363 id,
364 entity_path,
365 heap_size_bytes: _,
366 is_sorted: _,
367 row_ids: _,
368 timelines,
369 components,
370 } = self;
371
372 re_tracing::profile_function!();
373
374 Self {
375 id: *id,
376 entity_path: entity_path.clone(),
377 heap_size_bytes: Default::default(),
378 is_sorted: true,
379 row_ids: RowId::arrow_from_slice(&[]),
380 timelines: timelines
381 .iter()
382 .map(|(&timeline, time_column)| (timeline, time_column.emptied()))
383 .collect(),
384 components: components
385 .iter()
386 .map(|(component_desc, list_array)| {
387 let field = match list_array.data_type() {
388 arrow::datatypes::DataType::List(field) => field.clone(),
389 _ => unreachable!("This is always s list array"),
390 };
391 (component_desc.clone(), ArrowListArray::new_null(field, 0))
392 })
393 .collect(),
394 }
395 }
396
397 #[must_use]
404 #[inline]
405 pub fn components_removed(self) -> Self {
406 let Self {
407 id,
408 entity_path,
409 heap_size_bytes: _,
410 is_sorted,
411 row_ids,
412 timelines,
413 components: _,
414 } = self;
415
416 Self {
417 id,
418 entity_path,
419 heap_size_bytes: Default::default(), is_sorted,
421 row_ids,
422 timelines,
423 components: Default::default(),
424 }
425 }
426
427 #[inline]
441 pub fn deduped_latest_on_index(&self, index: &TimelineName) -> Self {
442 re_tracing::profile_function!();
443
444 if self.is_empty() {
445 return self.clone();
446 }
447
448 if self.is_static() {
449 return self.row_sliced(self.num_rows().saturating_sub(1), 1);
450 }
451
452 let Some(time_column) = self.timelines.get(index) else {
453 return self.clone();
454 };
455
456 let indices = {
457 let mut i = 0;
458 let indices = time_column
459 .times_raw()
460 .iter()
461 .copied()
462 .dedup_with_count()
463 .map(|(count, _time)| {
464 i += count;
465 i.saturating_sub(1) as i32
466 })
467 .collect_vec();
468 arrow::array::Int32Array::from(indices)
469 };
470
471 let chunk = Self {
472 id: self.id,
473 entity_path: self.entity_path.clone(),
474 heap_size_bytes: Default::default(),
475 is_sorted: self.is_sorted,
476 row_ids: re_arrow_util::take_array(
477 &self.row_ids,
478 &arrow::array::Int32Array::from(indices.clone()),
479 ),
480 timelines: self
481 .timelines
482 .iter()
483 .map(|(&timeline, time_column)| (timeline, time_column.taken(&indices)))
484 .collect(),
485 components: self
486 .components
487 .iter()
488 .map(|(component_desc, list_array)| {
489 let filtered = re_arrow_util::take_array(list_array, &indices);
490 (component_desc.clone(), filtered)
491 })
492 .collect(),
493 };
494
495 #[cfg(debug_assertions)]
496 #[allow(clippy::unwrap_used)] {
498 chunk.sanity_check().unwrap();
499 }
500
501 chunk
502 }
503
504 #[must_use]
517 #[inline]
518 pub fn filtered(&self, filter: &ArrowBooleanArray) -> Option<Self> {
519 let Self {
520 id,
521 entity_path,
522 heap_size_bytes: _,
523 is_sorted,
524 row_ids,
525 timelines,
526 components,
527 } = self;
528
529 if filter.len() != self.num_rows() {
531 return None;
532 }
533
534 if self.is_empty() {
535 return Some(self.clone());
536 }
537
538 let num_filtered = filter.values().iter().filter(|&b| b).count();
539 if num_filtered == 0 {
540 return Some(self.emptied());
541 }
542
543 re_tracing::profile_function!();
544
545 let is_sorted = *is_sorted || num_filtered < 2;
546
547 let mut chunk = Self {
548 id: *id,
549 entity_path: entity_path.clone(),
550 heap_size_bytes: Default::default(),
551 is_sorted,
552 row_ids: re_arrow_util::filter_array(row_ids, filter),
553 timelines: timelines
554 .iter()
555 .map(|(&timeline, time_column)| (timeline, time_column.filtered(filter)))
556 .collect(),
557 components: components
558 .iter()
559 .map(|(component_desc, list_array)| {
560 let filtered = re_arrow_util::filter_array(list_array, filter);
561 (component_desc.clone(), filtered)
562 })
563 .collect(),
564 };
565
566 chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
582
583 #[cfg(debug_assertions)]
584 #[allow(clippy::unwrap_used)] chunk.sanity_check().unwrap();
586
587 Some(chunk)
588 }
589
590 #[must_use]
603 #[inline]
604 pub fn taken(&self, indices: &arrow::array::Int32Array) -> Self {
605 let Self {
606 id,
607 entity_path,
608 heap_size_bytes: _,
609 is_sorted,
610 row_ids,
611 timelines,
612 components,
613 } = self;
614
615 if self.is_empty() {
616 return self.clone();
617 }
618
619 if indices.is_empty() {
620 return self.emptied();
621 }
622
623 re_tracing::profile_function!();
624
625 let is_sorted = *is_sorted || (indices.len() < 2);
626
627 let mut chunk = Self {
628 id: *id,
629 entity_path: entity_path.clone(),
630 heap_size_bytes: Default::default(),
631 is_sorted,
632 row_ids: re_arrow_util::take_array(
633 row_ids,
634 &arrow::array::Int32Array::from(indices.clone()),
635 ),
636 timelines: timelines
637 .iter()
638 .map(|(&timeline, time_column)| (timeline, time_column.taken(indices)))
639 .collect(),
640 components: components
641 .iter()
642 .map(|(component_desc, list_array)| {
643 let taken = re_arrow_util::take_array(list_array, indices);
644 (component_desc.clone(), taken)
645 })
646 .collect(),
647 };
648
649 chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
665
666 #[cfg(debug_assertions)]
667 #[allow(clippy::unwrap_used)] chunk.sanity_check().unwrap();
669
670 chunk
671 }
672}
673
674impl TimeColumn {
675 #[inline]
683 pub fn row_sliced(&self, index: usize, len: usize) -> Self {
684 let Self {
685 timeline,
686 times,
687 is_sorted,
688 time_range: _,
689 } = self;
690
691 if index >= self.num_rows() {
695 return self.emptied();
696 }
697
698 let end_offset = usize::min(index.saturating_add(len), self.num_rows());
699 let len = end_offset.saturating_sub(index);
700
701 if len == 0 {
702 return self.emptied();
703 }
704
705 let is_sorted = *is_sorted || (len < 2);
706
707 let is_sorted_opt = is_sorted.then_some(is_sorted);
727
728 Self::new(is_sorted_opt, *timeline, times.clone().slice(index, len))
729 }
730
731 #[inline]
735 pub fn emptied(&self) -> Self {
736 let Self {
737 timeline,
738 times: _,
739 is_sorted: _,
740 time_range: _,
741 } = self;
742
743 Self::new(Some(true), *timeline, vec![].into())
744 }
745
746 #[inline]
750 pub(crate) fn filtered(&self, filter: &ArrowBooleanArray) -> Self {
751 let Self {
752 timeline,
753 times,
754 is_sorted,
755 time_range: _,
756 } = self;
757
758 let is_sorted = *is_sorted || filter.values().iter().filter(|&b| b).count() < 2;
759
760 let is_sorted_opt = is_sorted.then_some(is_sorted);
776
777 Self::new(
778 is_sorted_opt,
779 *timeline,
780 re_arrow_util::filter_array(
781 &arrow::array::Int64Array::new(times.clone(), None),
782 filter,
783 )
784 .into_parts()
785 .1,
786 )
787 }
788
789 #[inline]
793 pub(crate) fn taken(&self, indices: &arrow::array::Int32Array) -> Self {
794 let Self {
795 timeline,
796 times,
797 is_sorted,
798 time_range: _,
799 } = self;
800
801 let new_times = re_arrow_util::take_array(
802 &arrow::array::Int64Array::new(times.clone(), None),
803 &arrow::array::Int32Array::from(indices.clone()),
804 )
805 .into_parts()
806 .1;
807
808 Self::new(Some(*is_sorted), *timeline, new_times)
809 }
810}
811
812#[cfg(test)]
815mod tests {
816 use itertools::Itertools as _;
817 use re_log_types::{
818 TimePoint,
819 example_components::{MyColor, MyLabel, MyPoint, MyPoints},
820 };
821
822 use crate::{Chunk, RowId, Timeline};
823
824 use super::*;
825
826 #[test]
827 fn cell() -> anyhow::Result<()> {
828 let entity_path = "my/entity";
829
830 let row_id1 = RowId::ZERO.incremented_by(10);
831 let row_id2 = RowId::ZERO.incremented_by(20);
832 let row_id3 = RowId::ZERO.incremented_by(30);
833 let row_id4 = RowId::new();
834 let row_id5 = RowId::new();
835
836 let timepoint1 = [
837 (Timeline::log_time(), 1000),
838 (Timeline::new_sequence("frame"), 1),
839 ];
840 let timepoint2 = [
841 (Timeline::log_time(), 1032),
842 (Timeline::new_sequence("frame"), 3),
843 ];
844 let timepoint3 = [
845 (Timeline::log_time(), 1064),
846 (Timeline::new_sequence("frame"), 5),
847 ];
848 let timepoint4 = [
849 (Timeline::log_time(), 1096),
850 (Timeline::new_sequence("frame"), 7),
851 ];
852 let timepoint5 = [
853 (Timeline::log_time(), 1128),
854 (Timeline::new_sequence("frame"), 9),
855 ];
856
857 let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
858 let points3 = &[MyPoint::new(6.0, 7.0)];
859
860 let colors4 = &[MyColor::from_rgb(1, 1, 1)];
861 let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
862
863 let labels1 = &[MyLabel("a".into())];
864 let labels2 = &[MyLabel("b".into())];
865 let labels3 = &[MyLabel("c".into())];
866 let labels4 = &[MyLabel("d".into())];
867 let labels5 = &[MyLabel("e".into())];
868
869 let mut chunk = Chunk::builder(entity_path)
870 .with_sparse_component_batches(
871 row_id2,
872 timepoint4,
873 [
874 (MyPoints::descriptor_points(), None),
875 (MyPoints::descriptor_colors(), Some(colors4 as _)),
876 (MyPoints::descriptor_labels(), Some(labels4 as _)),
877 ],
878 )
879 .with_sparse_component_batches(
880 row_id5,
881 timepoint5,
882 [
883 (MyPoints::descriptor_points(), None),
884 (MyPoints::descriptor_colors(), Some(colors5 as _)),
885 (MyPoints::descriptor_labels(), Some(labels5 as _)),
886 ],
887 )
888 .with_sparse_component_batches(
889 row_id1,
890 timepoint3,
891 [
892 (MyPoints::descriptor_points(), Some(points1 as _)),
893 (MyPoints::descriptor_colors(), None),
894 (MyPoints::descriptor_labels(), Some(labels1 as _)),
895 ],
896 )
897 .with_sparse_component_batches(
898 row_id4,
899 timepoint2,
900 [
901 (MyPoints::descriptor_points(), None),
902 (MyPoints::descriptor_colors(), None),
903 (MyPoints::descriptor_labels(), Some(labels2 as _)),
904 ],
905 )
906 .with_sparse_component_batches(
907 row_id3,
908 timepoint1,
909 [
910 (MyPoints::descriptor_points(), Some(points3 as _)),
911 (MyPoints::descriptor_colors(), None),
912 (MyPoints::descriptor_labels(), Some(labels3 as _)),
913 ],
914 )
915 .build()?;
916
917 eprintln!("chunk:\n{chunk}");
918
919 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
920 (row_id1, MyPoints::descriptor_points(), Some(points1 as _)),
921 (row_id2, MyPoints::descriptor_labels(), Some(labels4 as _)),
922 (row_id3, MyPoints::descriptor_colors(), None),
923 (row_id4, MyPoints::descriptor_labels(), Some(labels2 as _)),
924 (row_id5, MyPoints::descriptor_colors(), Some(colors5 as _)),
925 ];
926
927 assert!(!chunk.is_sorted());
928 for (row_id, component_desc, expected) in expectations {
929 let expected = expected
930 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
931 eprintln!("{component_desc} @ {row_id}");
932 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_desc));
933 }
934
935 chunk.sort_if_unsorted();
936 assert!(chunk.is_sorted());
937
938 for (row_id, component_desc, expected) in expectations {
939 let expected = expected
940 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
941 eprintln!("{component_desc} @ {row_id}");
942 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_desc));
943 }
944
945 Ok(())
946 }
947
948 #[test]
949 fn dedupe_temporal() -> anyhow::Result<()> {
950 let entity_path = "my/entity";
951
952 let row_id1 = RowId::new();
953 let row_id2 = RowId::new();
954 let row_id3 = RowId::new();
955 let row_id4 = RowId::new();
956 let row_id5 = RowId::new();
957
958 let timepoint1 = [
959 (Timeline::log_time(), 1000),
960 (Timeline::new_sequence("frame"), 1),
961 ];
962 let timepoint2 = [
963 (Timeline::log_time(), 1032),
964 (Timeline::new_sequence("frame"), 1),
965 ];
966 let timepoint3 = [
967 (Timeline::log_time(), 1064),
968 (Timeline::new_sequence("frame"), 1),
969 ];
970 let timepoint4 = [
971 (Timeline::log_time(), 1096),
972 (Timeline::new_sequence("frame"), 2),
973 ];
974 let timepoint5 = [
975 (Timeline::log_time(), 1128),
976 (Timeline::new_sequence("frame"), 2),
977 ];
978
979 let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
980 let points3 = &[MyPoint::new(6.0, 7.0)];
981
982 let colors4 = &[MyColor::from_rgb(1, 1, 1)];
983 let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
984
985 let labels1 = &[MyLabel("a".into())];
986 let labels2 = &[MyLabel("b".into())];
987 let labels3 = &[MyLabel("c".into())];
988 let labels4 = &[MyLabel("d".into())];
989 let labels5 = &[MyLabel("e".into())];
990
991 let chunk = Chunk::builder(entity_path)
992 .with_sparse_component_batches(
993 row_id1,
994 timepoint1,
995 [
996 (MyPoints::descriptor_points(), Some(points1 as _)),
997 (MyPoints::descriptor_colors(), None),
998 (MyPoints::descriptor_labels(), Some(labels1 as _)),
999 ],
1000 )
1001 .with_sparse_component_batches(
1002 row_id2,
1003 timepoint2,
1004 [
1005 (MyPoints::descriptor_points(), None),
1006 (MyPoints::descriptor_colors(), None),
1007 (MyPoints::descriptor_labels(), Some(labels2 as _)),
1008 ],
1009 )
1010 .with_sparse_component_batches(
1011 row_id3,
1012 timepoint3,
1013 [
1014 (MyPoints::descriptor_points(), Some(points3 as _)),
1015 (MyPoints::descriptor_colors(), None),
1016 (MyPoints::descriptor_labels(), Some(labels3 as _)),
1017 ],
1018 )
1019 .with_sparse_component_batches(
1020 row_id4,
1021 timepoint4,
1022 [
1023 (MyPoints::descriptor_points(), None),
1024 (MyPoints::descriptor_colors(), Some(colors4 as _)),
1025 (MyPoints::descriptor_labels(), Some(labels4 as _)),
1026 ],
1027 )
1028 .with_sparse_component_batches(
1029 row_id5,
1030 timepoint5,
1031 [
1032 (MyPoints::descriptor_points(), None),
1033 (MyPoints::descriptor_colors(), Some(colors5 as _)),
1034 (MyPoints::descriptor_labels(), Some(labels5 as _)),
1035 ],
1036 )
1037 .build()?;
1038
1039 eprintln!("chunk:\n{chunk}");
1040
1041 {
1042 let got = chunk.deduped_latest_on_index(&TimelineName::new("frame"));
1043 eprintln!("got:\n{got}");
1044 assert_eq!(2, got.num_rows());
1045
1046 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1047 (row_id3, MyPoints::descriptor_points(), Some(points3 as _)),
1048 (row_id3, MyPoints::descriptor_colors(), None),
1049 (row_id3, MyPoints::descriptor_labels(), Some(labels3 as _)),
1050 (row_id5, MyPoints::descriptor_points(), None),
1052 (row_id5, MyPoints::descriptor_colors(), Some(colors5 as _)),
1053 (row_id5, MyPoints::descriptor_labels(), Some(labels5 as _)),
1054 ];
1055
1056 for (row_id, component_desc, expected) in expectations {
1057 let expected = expected
1058 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1059 eprintln!("{component_desc} @ {row_id}");
1060 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_desc));
1061 }
1062 }
1063
1064 {
1065 let got = chunk.deduped_latest_on_index(&TimelineName::log_time());
1066 eprintln!("got:\n{got}");
1067 assert_eq!(5, got.num_rows());
1068
1069 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1070 (row_id1, MyPoints::descriptor_points(), Some(points1 as _)),
1071 (row_id1, MyPoints::descriptor_colors(), None),
1072 (row_id1, MyPoints::descriptor_labels(), Some(labels1 as _)),
1073 (row_id2, MyPoints::descriptor_points(), None),
1074 (row_id2, MyPoints::descriptor_colors(), None),
1075 (row_id2, MyPoints::descriptor_labels(), Some(labels2 as _)),
1076 (row_id3, MyPoints::descriptor_points(), Some(points3 as _)),
1077 (row_id3, MyPoints::descriptor_colors(), None),
1078 (row_id3, MyPoints::descriptor_labels(), Some(labels3 as _)),
1079 (row_id4, MyPoints::descriptor_points(), None),
1080 (row_id4, MyPoints::descriptor_colors(), Some(colors4 as _)),
1081 (row_id4, MyPoints::descriptor_labels(), Some(labels4 as _)),
1082 (row_id5, MyPoints::descriptor_points(), None),
1083 (row_id5, MyPoints::descriptor_colors(), Some(colors5 as _)),
1084 (row_id5, MyPoints::descriptor_labels(), Some(labels5 as _)),
1085 ];
1086
1087 for (row_id, component_desc, expected) in expectations {
1088 let expected = expected
1089 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1090 eprintln!("{component_desc} @ {row_id}");
1091 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_desc));
1092 }
1093 }
1094
1095 Ok(())
1096 }
1097
1098 #[test]
1099 fn dedupe_static() -> anyhow::Result<()> {
1100 let entity_path = "my/entity";
1101
1102 let row_id1 = RowId::new();
1103 let row_id2 = RowId::new();
1104 let row_id3 = RowId::new();
1105 let row_id4 = RowId::new();
1106 let row_id5 = RowId::new();
1107
1108 let timepoint_static = TimePoint::default();
1109
1110 let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1111 let points3 = &[MyPoint::new(6.0, 7.0)];
1112
1113 let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1114 let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1115
1116 let labels1 = &[MyLabel("a".into())];
1117 let labels2 = &[MyLabel("b".into())];
1118 let labels3 = &[MyLabel("c".into())];
1119 let labels4 = &[MyLabel("d".into())];
1120 let labels5 = &[MyLabel("e".into())];
1121
1122 let chunk = Chunk::builder(entity_path)
1123 .with_sparse_component_batches(
1124 row_id1,
1125 timepoint_static.clone(),
1126 [
1127 (MyPoints::descriptor_points(), Some(points1 as _)),
1128 (MyPoints::descriptor_colors(), None),
1129 (MyPoints::descriptor_labels(), Some(labels1 as _)),
1130 ],
1131 )
1132 .with_sparse_component_batches(
1133 row_id2,
1134 timepoint_static.clone(),
1135 [
1136 (MyPoints::descriptor_points(), None),
1137 (MyPoints::descriptor_colors(), None),
1138 (MyPoints::descriptor_labels(), Some(labels2 as _)),
1139 ],
1140 )
1141 .with_sparse_component_batches(
1142 row_id3,
1143 timepoint_static.clone(),
1144 [
1145 (MyPoints::descriptor_points(), Some(points3 as _)),
1146 (MyPoints::descriptor_colors(), None),
1147 (MyPoints::descriptor_labels(), Some(labels3 as _)),
1148 ],
1149 )
1150 .with_sparse_component_batches(
1151 row_id4,
1152 timepoint_static.clone(),
1153 [
1154 (MyPoints::descriptor_points(), None),
1155 (MyPoints::descriptor_colors(), Some(colors4 as _)),
1156 (MyPoints::descriptor_labels(), Some(labels4 as _)),
1157 ],
1158 )
1159 .with_sparse_component_batches(
1160 row_id5,
1161 timepoint_static.clone(),
1162 [
1163 (MyPoints::descriptor_points(), None),
1164 (MyPoints::descriptor_colors(), Some(colors5 as _)),
1165 (MyPoints::descriptor_labels(), Some(labels5 as _)),
1166 ],
1167 )
1168 .build()?;
1169
1170 eprintln!("chunk:\n{chunk}");
1171
1172 {
1173 let got = chunk.deduped_latest_on_index(&TimelineName::new("frame"));
1174 eprintln!("got:\n{got}");
1175 assert_eq!(1, got.num_rows());
1176
1177 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1178 (row_id5, MyPoints::descriptor_points(), None),
1179 (row_id5, MyPoints::descriptor_colors(), Some(colors5 as _)),
1180 (row_id5, MyPoints::descriptor_labels(), Some(labels5 as _)),
1181 ];
1182
1183 for (row_id, component_descr, expected) in expectations {
1184 let expected = expected
1185 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1186 eprintln!("{component_descr} @ {row_id}");
1187 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_descr));
1188 }
1189 }
1190
1191 {
1192 let got = chunk.deduped_latest_on_index(&TimelineName::log_time());
1193 eprintln!("got:\n{got}");
1194 assert_eq!(1, got.num_rows());
1195
1196 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1197 (row_id5, MyPoints::descriptor_points(), None),
1198 (row_id5, MyPoints::descriptor_colors(), Some(colors5 as _)),
1199 (row_id5, MyPoints::descriptor_labels(), Some(labels5 as _)),
1200 ];
1201
1202 for (row_id, component_type, expected) in expectations {
1203 let expected = expected
1204 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1205 eprintln!("{component_type} @ {row_id}");
1206 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_type));
1207 }
1208 }
1209
1210 Ok(())
1211 }
1212
1213 #[test]
1214 fn filtered() -> anyhow::Result<()> {
1215 let entity_path = "my/entity";
1216
1217 let row_id1 = RowId::new();
1218 let row_id2 = RowId::new();
1219 let row_id3 = RowId::new();
1220 let row_id4 = RowId::new();
1221 let row_id5 = RowId::new();
1222
1223 let timepoint1 = [
1224 (Timeline::log_time(), 1000),
1225 (Timeline::new_sequence("frame"), 1),
1226 ];
1227 let timepoint2 = [
1228 (Timeline::log_time(), 1032),
1229 (Timeline::new_sequence("frame"), 1),
1230 ];
1231 let timepoint3 = [
1232 (Timeline::log_time(), 1064),
1233 (Timeline::new_sequence("frame"), 1),
1234 ];
1235 let timepoint4 = [
1236 (Timeline::log_time(), 1096),
1237 (Timeline::new_sequence("frame"), 2),
1238 ];
1239 let timepoint5 = [
1240 (Timeline::log_time(), 1128),
1241 (Timeline::new_sequence("frame"), 2),
1242 ];
1243
1244 let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1245 let points3 = &[MyPoint::new(6.0, 7.0)];
1246
1247 let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1248 let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1249
1250 let labels1 = &[MyLabel("a".into())];
1251 let labels2 = &[MyLabel("b".into())];
1252 let labels3 = &[MyLabel("c".into())];
1253 let labels4 = &[MyLabel("d".into())];
1254 let labels5 = &[MyLabel("e".into())];
1255
1256 let chunk = Chunk::builder(entity_path)
1257 .with_sparse_component_batches(
1258 row_id1,
1259 timepoint1,
1260 [
1261 (MyPoints::descriptor_points(), Some(points1 as _)),
1262 (MyPoints::descriptor_colors(), None),
1263 (MyPoints::descriptor_labels(), Some(labels1 as _)),
1264 ],
1265 )
1266 .with_sparse_component_batches(
1267 row_id2,
1268 timepoint2,
1269 [
1270 (MyPoints::descriptor_points(), None),
1271 (MyPoints::descriptor_colors(), None),
1272 (MyPoints::descriptor_labels(), Some(labels2 as _)),
1273 ],
1274 )
1275 .with_sparse_component_batches(
1276 row_id3,
1277 timepoint3,
1278 [
1279 (MyPoints::descriptor_points(), Some(points3 as _)),
1280 (MyPoints::descriptor_colors(), None),
1281 (MyPoints::descriptor_labels(), Some(labels3 as _)),
1282 ],
1283 )
1284 .with_sparse_component_batches(
1285 row_id4,
1286 timepoint4,
1287 [
1288 (MyPoints::descriptor_points(), None),
1289 (MyPoints::descriptor_colors(), Some(colors4 as _)),
1290 (MyPoints::descriptor_labels(), Some(labels4 as _)),
1291 ],
1292 )
1293 .with_sparse_component_batches(
1294 row_id5,
1295 timepoint5,
1296 [
1297 (MyPoints::descriptor_points(), None),
1298 (MyPoints::descriptor_colors(), Some(colors5 as _)),
1299 (MyPoints::descriptor_labels(), Some(labels5 as _)),
1300 ],
1301 )
1302 .build()?;
1303
1304 eprintln!("chunk:\n{chunk}");
1305
1306 {
1308 let filter =
1309 ArrowBooleanArray::from((0..chunk.num_rows()).map(|i| i % 2 == 0).collect_vec());
1310 let got = chunk.filtered(&filter).unwrap();
1311 eprintln!("got:\n{got}");
1312 assert_eq!(
1313 filter.values().iter().filter(|&b| b).count(),
1314 got.num_rows()
1315 );
1316
1317 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1318 (row_id1, MyPoints::descriptor_points(), Some(points1 as _)),
1319 (row_id1, MyPoints::descriptor_colors(), None),
1320 (row_id1, MyPoints::descriptor_labels(), Some(labels1 as _)),
1321 (row_id3, MyPoints::descriptor_points(), Some(points3 as _)),
1323 (row_id3, MyPoints::descriptor_colors(), None),
1324 (row_id3, MyPoints::descriptor_labels(), Some(labels3 as _)),
1325 (row_id5, MyPoints::descriptor_points(), None),
1327 (row_id5, MyPoints::descriptor_colors(), Some(colors5 as _)),
1328 (row_id5, MyPoints::descriptor_labels(), Some(labels5 as _)),
1329 ];
1330
1331 for (row_id, component_type, expected) in expectations {
1332 let expected = expected
1333 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1334 eprintln!("{component_type} @ {row_id}");
1335 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_type));
1336 }
1337 }
1338
1339 {
1341 let filter = ArrowBooleanArray::from(
1342 (0..chunk.num_rows() / 2).map(|i| i % 2 == 0).collect_vec(),
1343 );
1344 let got = chunk.filtered(&filter);
1345 assert!(got.is_none());
1346 }
1347
1348 {
1350 let filter = ArrowBooleanArray::from(
1351 (0..chunk.num_rows() * 2).map(|i| i % 2 == 0).collect_vec(),
1352 );
1353 let got = chunk.filtered(&filter);
1354 assert!(got.is_none());
1355 }
1356
1357 Ok(())
1358 }
1359
1360 #[test]
1361 fn taken() -> anyhow::Result<()> {
1362 use arrow::array::Int32Array as ArrowInt32Array;
1363
1364 let entity_path = "my/entity";
1365
1366 let row_id1 = RowId::new();
1367 let row_id2 = RowId::new();
1368 let row_id3 = RowId::new();
1369 let row_id4 = RowId::new();
1370 let row_id5 = RowId::new();
1371
1372 let timepoint1 = [
1373 (Timeline::log_time(), 1000),
1374 (Timeline::new_sequence("frame"), 1),
1375 ];
1376 let timepoint2 = [
1377 (Timeline::log_time(), 1032),
1378 (Timeline::new_sequence("frame"), 1),
1379 ];
1380 let timepoint3 = [
1381 (Timeline::log_time(), 1064),
1382 (Timeline::new_sequence("frame"), 1),
1383 ];
1384 let timepoint4 = [
1385 (Timeline::log_time(), 1096),
1386 (Timeline::new_sequence("frame"), 2),
1387 ];
1388 let timepoint5 = [
1389 (Timeline::log_time(), 1128),
1390 (Timeline::new_sequence("frame"), 2),
1391 ];
1392
1393 let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1394 let points3 = &[MyPoint::new(6.0, 7.0)];
1395
1396 let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1397 let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1398
1399 let labels1 = &[MyLabel("a".into())];
1400 let labels2 = &[MyLabel("b".into())];
1401 let labels3 = &[MyLabel("c".into())];
1402 let labels4 = &[MyLabel("d".into())];
1403 let labels5 = &[MyLabel("e".into())];
1404
1405 let chunk = Chunk::builder(entity_path)
1406 .with_sparse_component_batches(
1407 row_id1,
1408 timepoint1,
1409 [
1410 (MyPoints::descriptor_points(), Some(points1 as _)),
1411 (MyPoints::descriptor_colors(), None),
1412 (MyPoints::descriptor_labels(), Some(labels1 as _)),
1413 ],
1414 )
1415 .with_sparse_component_batches(
1416 row_id2,
1417 timepoint2,
1418 [
1419 (MyPoints::descriptor_points(), None),
1420 (MyPoints::descriptor_colors(), None),
1421 (MyPoints::descriptor_labels(), Some(labels2 as _)),
1422 ],
1423 )
1424 .with_sparse_component_batches(
1425 row_id3,
1426 timepoint3,
1427 [
1428 (MyPoints::descriptor_points(), Some(points3 as _)),
1429 (MyPoints::descriptor_colors(), None),
1430 (MyPoints::descriptor_labels(), Some(labels3 as _)),
1431 ],
1432 )
1433 .with_sparse_component_batches(
1434 row_id4,
1435 timepoint4,
1436 [
1437 (MyPoints::descriptor_points(), None),
1438 (MyPoints::descriptor_colors(), Some(colors4 as _)),
1439 (MyPoints::descriptor_labels(), Some(labels4 as _)),
1440 ],
1441 )
1442 .with_sparse_component_batches(
1443 row_id5,
1444 timepoint5,
1445 [
1446 (MyPoints::descriptor_points(), None),
1447 (MyPoints::descriptor_colors(), Some(colors5 as _)),
1448 (MyPoints::descriptor_labels(), Some(labels5 as _)),
1449 ],
1450 )
1451 .build()?;
1452
1453 eprintln!("chunk:\n{chunk}");
1454
1455 {
1457 let indices = ArrowInt32Array::from(
1458 (0..chunk.num_rows() as i32)
1459 .filter(|i| i % 2 == 0)
1460 .collect_vec(),
1461 );
1462 let got = chunk.taken(&indices);
1463 eprintln!("got:\n{got}");
1464 assert_eq!(indices.len(), got.num_rows());
1465
1466 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1467 (row_id1, MyPoints::descriptor_points(), Some(points1 as _)),
1468 (row_id1, MyPoints::descriptor_colors(), None),
1469 (row_id1, MyPoints::descriptor_labels(), Some(labels1 as _)),
1470 (row_id3, MyPoints::descriptor_points(), Some(points3 as _)),
1472 (row_id3, MyPoints::descriptor_colors(), None),
1473 (row_id3, MyPoints::descriptor_labels(), Some(labels3 as _)),
1474 (row_id5, MyPoints::descriptor_points(), None),
1476 (row_id5, MyPoints::descriptor_colors(), Some(colors5 as _)),
1477 (row_id5, MyPoints::descriptor_labels(), Some(labels5 as _)),
1478 ];
1479
1480 for (row_id, component_type, expected) in expectations {
1481 let expected = expected
1482 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1483 eprintln!("{component_type} @ {row_id}");
1484 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, component_type));
1485 }
1486 }
1487
1488 {
1490 let indices = ArrowInt32Array::from(
1491 std::iter::repeat_n(2i32, chunk.num_rows() * 2).collect_vec(),
1492 );
1493 let got = chunk.taken(&indices);
1494 eprintln!("got:\n{got}");
1495 assert_eq!(indices.len(), got.num_rows());
1496 }
1497
1498 Ok(())
1499 }
1500}