1use arrow::array::{
2 Array as _, ArrayRef as ArrowArrayRef, BooleanArray as ArrowBooleanArray,
3 ListArray as ArrowListArray,
4};
5use itertools::Itertools as _;
6use nohash_hasher::IntSet;
7use re_log_types::TimelineName;
8use re_types_core::{ComponentIdentifier, SerializedComponentColumn};
9
10use crate::{Chunk, RowId, TimeColumn};
11
12impl Chunk {
18 pub fn cell(&self, row_id: RowId, component: ComponentIdentifier) -> Option<ArrowArrayRef> {
24 let list_array = self.components.get_array(component)?;
25
26 if self.is_sorted() {
27 let row_ids = self.row_ids_slice();
28 let index = row_ids.binary_search(&row_id).ok()?;
29 list_array.is_valid(index).then(|| list_array.value(index))
30 } else {
31 let (index, _) = self.row_ids().find_position(|id| *id == row_id)?;
32 list_array.is_valid(index).then(|| list_array.value(index))
33 }
34 }
35
36 #[must_use]
59 pub fn row_sliced_shallow(&self, index: usize, len: usize) -> Self {
60 let deep = false;
61 self.row_sliced_impl(index, len, deep)
62 }
63
64 #[must_use]
87 pub fn row_sliced_deep(&self, index: usize, len: usize) -> Self {
88 let deep = true;
89 self.row_sliced_impl(index, len, deep)
90 }
91
92 #[must_use]
93 fn row_sliced_impl(&self, index: usize, len: usize, deep: bool) -> Self {
94 re_tracing::profile_function!(if deep { "deep" } else { "shallow" });
95
96 let Self {
97 id,
98 entity_path,
99 heap_size_bytes: _,
100 is_sorted,
101 row_ids,
102 timelines,
103 components,
104 } = self;
105
106 if index >= self.num_rows() {
110 return self.emptied();
111 }
112
113 let end_offset = usize::min(index.saturating_add(len), self.num_rows());
114 let len = end_offset.saturating_sub(index);
115
116 if len == 0 {
117 return self.emptied();
118 }
119
120 let is_sorted = *is_sorted || (len < 2);
121
122 let mut chunk = Self {
123 id: *id,
124 entity_path: entity_path.clone(),
125 heap_size_bytes: Default::default(),
126 is_sorted,
127 row_ids: if deep {
128 re_arrow_util::deep_slice_array(row_ids, index, len)
129 } else {
130 row_ids.slice(index, len)
131 },
132 timelines: timelines
133 .iter()
134 .map(|(timeline, time_column)| (*timeline, time_column.row_sliced(index, len)))
135 .collect(),
136 components: components
137 .values()
138 .map(|column| {
139 SerializedComponentColumn::new(
140 if deep {
141 re_arrow_util::deep_slice_array(&column.list_array, index, len)
142 } else {
143 column.list_array.slice(index, len)
144 },
145 column.descriptor.clone(),
146 )
147 })
148 .collect(),
149 };
150
151 chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
171
172 #[cfg(debug_assertions)]
173 #[expect(clippy::unwrap_used)] chunk.sanity_check().unwrap();
175
176 chunk
177 }
178
179 #[must_use]
189 #[inline]
190 pub fn timeline_sliced(&self, timeline: TimelineName) -> Self {
191 let Self {
192 id,
193 entity_path,
194 heap_size_bytes: _,
195 is_sorted,
196 row_ids,
197 timelines,
198 components,
199 } = self;
200
201 let chunk = Self {
202 id: *id,
203 entity_path: entity_path.clone(),
204 heap_size_bytes: Default::default(),
205 is_sorted: *is_sorted,
206 row_ids: row_ids.clone(),
207 timelines: timelines
208 .get_key_value(&timeline)
209 .map(|(timeline, time_column)| (*timeline, time_column.clone()))
210 .into_iter()
211 .collect(),
212 components: components.clone(),
213 };
214
215 #[cfg(debug_assertions)]
216 #[expect(clippy::unwrap_used)] chunk.sanity_check().unwrap();
218
219 chunk
220 }
221
222 #[must_use]
232 #[inline]
233 pub fn component_sliced(&self, component: ComponentIdentifier) -> Self {
234 let Self {
235 id,
236 entity_path,
237 heap_size_bytes: _,
238 is_sorted,
239 row_ids,
240 timelines,
241 components,
242 } = self;
243
244 let chunk = Self {
245 id: *id,
246 entity_path: entity_path.clone(),
247 heap_size_bytes: Default::default(),
248 is_sorted: *is_sorted,
249 row_ids: row_ids.clone(),
250 timelines: timelines.clone(),
251 components: components
252 .get(component)
253 .map(|column| {
254 SerializedComponentColumn::new(
255 column.list_array.clone(),
256 column.descriptor.clone(),
257 )
258 })
259 .into_iter()
260 .collect(),
261 };
262
263 #[cfg(debug_assertions)]
264 #[expect(clippy::unwrap_used)] chunk.sanity_check().unwrap();
266
267 chunk
268 }
269
270 #[must_use]
280 #[inline]
281 pub fn timelines_sliced(&self, timelines_to_keep: &IntSet<TimelineName>) -> Self {
282 let Self {
283 id,
284 entity_path,
285 heap_size_bytes: _,
286 is_sorted,
287 row_ids,
288 timelines,
289 components,
290 } = self;
291
292 let chunk = Self {
293 id: *id,
294 entity_path: entity_path.clone(),
295 heap_size_bytes: Default::default(),
296 is_sorted: *is_sorted,
297 row_ids: row_ids.clone(),
298 timelines: timelines
299 .iter()
300 .filter(|(timeline, _)| timelines_to_keep.contains(timeline))
301 .map(|(timeline, time_column)| (*timeline, time_column.clone()))
302 .collect(),
303 components: components.clone(),
304 };
305
306 #[cfg(debug_assertions)]
307 #[expect(clippy::unwrap_used)] chunk.sanity_check().unwrap();
309
310 chunk
311 }
312
313 #[must_use]
325 #[inline]
326 pub fn densified(&self, component_pov: ComponentIdentifier) -> Self {
327 let Self {
328 id,
329 entity_path,
330 heap_size_bytes: _,
331 is_sorted,
332 row_ids,
333 timelines,
334 components,
335 } = self;
336
337 if self.is_empty() {
338 return self.clone();
339 }
340
341 let Some(component_list_array) = self.components.get_array(component_pov) else {
342 return self.clone();
343 };
344
345 let Some(validity) = component_list_array.nulls() else {
346 return self.clone();
347 };
348
349 re_tracing::profile_function!();
350
351 let mask = validity.iter().collect_vec();
352 let is_sorted = *is_sorted || (mask.iter().filter(|&&b| b).count() < 2);
353 let validity_filter = ArrowBooleanArray::from(mask);
354
355 let mut chunk = Self {
356 id: *id,
357 entity_path: entity_path.clone(),
358 heap_size_bytes: Default::default(),
359 is_sorted,
360 row_ids: re_arrow_util::filter_array(row_ids, &validity_filter),
361 timelines: timelines
362 .iter()
363 .map(|(&timeline, time_column)| (timeline, time_column.filtered(&validity_filter)))
364 .collect(),
365 components: components
366 .values()
367 .map(|column| {
368 let filtered =
369 re_arrow_util::filter_array(&column.list_array, &validity_filter);
370 let filtered = if column.descriptor.component == component_pov {
371 let (field, offsets, values, _nulls) = filtered.into_parts();
376 ArrowListArray::new(field, offsets, values, None)
377 } else {
378 filtered
379 };
380
381 SerializedComponentColumn::new(filtered, column.descriptor.clone())
382 })
383 .collect(),
384 };
385
386 chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
402
403 #[cfg(debug_assertions)]
404 #[expect(clippy::unwrap_used)] chunk.sanity_check().unwrap();
406
407 chunk
408 }
409
410 #[must_use]
416 #[inline]
417 pub fn emptied(&self) -> Self {
418 let Self {
419 id,
420 entity_path,
421 heap_size_bytes: _,
422 is_sorted: _,
423 row_ids: _,
424 timelines,
425 components,
426 } = self;
427
428 re_tracing::profile_function!();
429
430 Self {
431 id: *id,
432 entity_path: entity_path.clone(),
433 heap_size_bytes: Default::default(),
434 is_sorted: true,
435 row_ids: RowId::arrow_from_slice(&[]),
436 timelines: timelines
437 .iter()
438 .map(|(&timeline, time_column)| (timeline, time_column.emptied()))
439 .collect(),
440 components: components
441 .values()
442 .map(|column| {
443 let field = match column.list_array.data_type() {
444 arrow::datatypes::DataType::List(field) => field.clone(),
445 _ => unreachable!("This is always s list array"),
446 };
447 SerializedComponentColumn::new(
448 ArrowListArray::new_null(field, 0),
449 column.descriptor.clone(),
450 )
451 })
452 .collect(),
453 }
454 }
455
456 #[must_use]
463 #[inline]
464 pub fn components_removed(self) -> Self {
465 let Self {
466 id,
467 entity_path,
468 heap_size_bytes: _,
469 is_sorted,
470 row_ids,
471 timelines,
472 components: _,
473 } = self;
474
475 Self {
476 id,
477 entity_path,
478 heap_size_bytes: Default::default(), is_sorted,
480 row_ids,
481 timelines,
482 components: Default::default(),
483 }
484 }
485
486 #[inline]
500 pub fn deduped_latest_on_index(&self, index: &TimelineName) -> Self {
501 re_tracing::profile_function!();
502
503 if self.is_empty() {
504 return self.clone();
505 }
506
507 if self.is_static() {
508 return self.row_sliced_shallow(self.num_rows().saturating_sub(1), 1);
509 }
510
511 let Some(time_column) = self.timelines.get(index) else {
512 return self.clone();
513 };
514
515 let indices = {
516 let mut i = 0;
517 let indices = time_column
518 .times_raw()
519 .iter()
520 .copied()
521 .dedup_with_count()
522 .map(|(count, _time)| {
523 i += count;
524
525 #[expect(clippy::cast_possible_wrap)]
527 {
528 i.saturating_sub(1) as i32
529 }
530 })
531 .collect_vec();
532 arrow::array::Int32Array::from(indices)
533 };
534
535 let chunk = Self {
536 id: self.id,
537 entity_path: self.entity_path.clone(),
538 heap_size_bytes: Default::default(),
539 is_sorted: self.is_sorted,
540 row_ids: re_arrow_util::take_array(
541 &self.row_ids,
542 &arrow::array::Int32Array::from(indices.clone()),
543 ),
544 timelines: self
545 .timelines
546 .iter()
547 .map(|(&timeline, time_column)| (timeline, time_column.taken(&indices)))
548 .collect(),
549 components: self
550 .components
551 .values()
552 .map(|column| {
553 let filtered = re_arrow_util::take_array(&column.list_array, &indices);
554 SerializedComponentColumn::new(filtered, column.descriptor.clone())
555 })
556 .collect(),
557 };
558
559 #[cfg(debug_assertions)]
560 #[expect(clippy::unwrap_used)] {
562 chunk.sanity_check().unwrap();
563 }
564
565 chunk
566 }
567
568 #[must_use]
581 #[inline]
582 pub fn filtered(&self, filter: &ArrowBooleanArray) -> Option<Self> {
583 let Self {
584 id,
585 entity_path,
586 heap_size_bytes: _,
587 is_sorted,
588 row_ids,
589 timelines,
590 components,
591 } = self;
592
593 if filter.len() != self.num_rows() {
595 return None;
596 }
597
598 if self.is_empty() {
599 return Some(self.clone());
600 }
601
602 let num_filtered = filter.values().iter().filter(|&b| b).count();
603 if num_filtered == 0 {
604 return Some(self.emptied());
605 }
606
607 re_tracing::profile_function!();
608
609 let is_sorted = *is_sorted || num_filtered < 2;
610
611 let mut chunk = Self {
612 id: *id,
613 entity_path: entity_path.clone(),
614 heap_size_bytes: Default::default(),
615 is_sorted,
616 row_ids: re_arrow_util::filter_array(row_ids, filter),
617 timelines: timelines
618 .iter()
619 .map(|(&timeline, time_column)| (timeline, time_column.filtered(filter)))
620 .collect(),
621 components: components
622 .values()
623 .map(|column| {
624 let filtered = re_arrow_util::filter_array(&column.list_array, filter);
625 SerializedComponentColumn::new(filtered, column.descriptor.clone())
626 })
627 .collect(),
628 };
629
630 chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
646
647 #[cfg(debug_assertions)]
648 #[expect(clippy::unwrap_used)] chunk.sanity_check().unwrap();
650
651 Some(chunk)
652 }
653
654 #[must_use]
667 #[inline]
668 pub fn taken(&self, indices: &arrow::array::Int32Array) -> Self {
669 let Self {
670 id,
671 entity_path,
672 heap_size_bytes: _,
673 is_sorted,
674 row_ids,
675 timelines,
676 components,
677 } = self;
678
679 if self.is_empty() {
680 return self.clone();
681 }
682
683 if indices.is_empty() {
684 return self.emptied();
685 }
686
687 re_tracing::profile_function!();
688
689 let is_sorted = *is_sorted || (indices.len() < 2);
690
691 let mut chunk = Self {
692 id: *id,
693 entity_path: entity_path.clone(),
694 heap_size_bytes: Default::default(),
695 is_sorted,
696 row_ids: re_arrow_util::take_array(
697 row_ids,
698 &arrow::array::Int32Array::from(indices.clone()),
699 ),
700 timelines: timelines
701 .iter()
702 .map(|(&timeline, time_column)| (timeline, time_column.taken(indices)))
703 .collect(),
704 components: components
705 .values()
706 .map(|column| {
707 let taken = re_arrow_util::take_array(&column.list_array, indices);
708 SerializedComponentColumn::new(taken, column.descriptor.clone())
709 })
710 .collect(),
711 };
712
713 chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
729
730 #[cfg(debug_assertions)]
731 #[expect(clippy::unwrap_used)] chunk.sanity_check().unwrap();
733
734 chunk
735 }
736}
737
738impl TimeColumn {
739 #[inline]
747 pub fn row_sliced(&self, index: usize, len: usize) -> Self {
748 let Self {
749 timeline,
750 times,
751 is_sorted,
752 time_range: _,
753 } = self;
754
755 if index >= self.num_rows() {
759 return self.emptied();
760 }
761
762 let end_offset = usize::min(index.saturating_add(len), self.num_rows());
763 let len = end_offset.saturating_sub(index);
764
765 if len == 0 {
766 return self.emptied();
767 }
768
769 let is_sorted = *is_sorted || (len < 2);
770
771 let is_sorted_opt = is_sorted.then_some(is_sorted);
791
792 Self::new(is_sorted_opt, *timeline, times.clone().slice(index, len))
793 }
794
795 #[inline]
799 pub fn emptied(&self) -> Self {
800 let Self {
801 timeline,
802 times: _,
803 is_sorted: _,
804 time_range: _,
805 } = self;
806
807 Self::new(Some(true), *timeline, vec![].into())
808 }
809
810 #[inline]
814 pub(crate) fn filtered(&self, filter: &ArrowBooleanArray) -> Self {
815 let Self {
816 timeline,
817 times,
818 is_sorted,
819 time_range: _,
820 } = self;
821
822 let is_sorted = *is_sorted || filter.values().iter().filter(|&b| b).count() < 2;
823
824 let is_sorted_opt = is_sorted.then_some(is_sorted);
840
841 Self::new(
842 is_sorted_opt,
843 *timeline,
844 re_arrow_util::filter_array(
845 &arrow::array::Int64Array::new(times.clone(), None),
846 filter,
847 )
848 .into_parts()
849 .1,
850 )
851 }
852
853 #[inline]
857 pub(crate) fn taken(&self, indices: &arrow::array::Int32Array) -> Self {
858 let Self {
859 timeline,
860 times,
861 is_sorted,
862 time_range: _,
863 } = self;
864
865 let new_times = re_arrow_util::take_array(
866 &arrow::array::Int64Array::new(times.clone(), None),
867 &arrow::array::Int32Array::from(indices.clone()),
868 )
869 .into_parts()
870 .1;
871
872 Self::new(Some(*is_sorted), *timeline, new_times)
873 }
874}
875
876#[cfg(test)]
879mod tests {
880 #![expect(clippy::cast_possible_wrap)]
881
882 use itertools::Itertools as _;
883 use re_log_types::TimePoint;
884 use re_log_types::example_components::{MyColor, MyLabel, MyPoint, MyPoints};
885
886 use super::*;
887 use crate::{Chunk, RowId, Timeline};
888
889 #[test]
890 fn cell() -> anyhow::Result<()> {
891 let mypoints_points_component = MyPoints::descriptor_points().component;
892 let mypoints_colors_component = MyPoints::descriptor_colors().component;
893 let mypoints_labels_component = MyPoints::descriptor_labels().component;
894
895 let entity_path = "my/entity";
896
897 let row_id1 = RowId::ZERO.incremented_by(10);
898 let row_id2 = RowId::ZERO.incremented_by(20);
899 let row_id3 = RowId::ZERO.incremented_by(30);
900 let row_id4 = RowId::new();
901 let row_id5 = RowId::new();
902
903 let timepoint1 = [
904 (Timeline::log_time(), 1000),
905 (Timeline::new_sequence("frame"), 1),
906 ];
907 let timepoint2 = [
908 (Timeline::log_time(), 1032),
909 (Timeline::new_sequence("frame"), 3),
910 ];
911 let timepoint3 = [
912 (Timeline::log_time(), 1064),
913 (Timeline::new_sequence("frame"), 5),
914 ];
915 let timepoint4 = [
916 (Timeline::log_time(), 1096),
917 (Timeline::new_sequence("frame"), 7),
918 ];
919 let timepoint5 = [
920 (Timeline::log_time(), 1128),
921 (Timeline::new_sequence("frame"), 9),
922 ];
923
924 let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
925 let points3 = &[MyPoint::new(6.0, 7.0)];
926
927 let colors4 = &[MyColor::from_rgb(1, 1, 1)];
928 let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
929
930 let labels1 = &[MyLabel("a".into())];
931 let labels2 = &[MyLabel("b".into())];
932 let labels3 = &[MyLabel("c".into())];
933 let labels4 = &[MyLabel("d".into())];
934 let labels5 = &[MyLabel("e".into())];
935
936 let mut chunk = Chunk::builder(entity_path)
937 .with_sparse_component_batches(
938 row_id2,
939 timepoint4,
940 [
941 (MyPoints::descriptor_points(), None),
942 (MyPoints::descriptor_colors(), Some(colors4 as _)),
943 (MyPoints::descriptor_labels(), Some(labels4 as _)),
944 ],
945 )
946 .with_sparse_component_batches(
947 row_id5,
948 timepoint5,
949 [
950 (MyPoints::descriptor_points(), None),
951 (MyPoints::descriptor_colors(), Some(colors5 as _)),
952 (MyPoints::descriptor_labels(), Some(labels5 as _)),
953 ],
954 )
955 .with_sparse_component_batches(
956 row_id1,
957 timepoint3,
958 [
959 (MyPoints::descriptor_points(), Some(points1 as _)),
960 (MyPoints::descriptor_colors(), None),
961 (MyPoints::descriptor_labels(), Some(labels1 as _)),
962 ],
963 )
964 .with_sparse_component_batches(
965 row_id4,
966 timepoint2,
967 [
968 (MyPoints::descriptor_points(), None),
969 (MyPoints::descriptor_colors(), None),
970 (MyPoints::descriptor_labels(), Some(labels2 as _)),
971 ],
972 )
973 .with_sparse_component_batches(
974 row_id3,
975 timepoint1,
976 [
977 (MyPoints::descriptor_points(), Some(points3 as _)),
978 (MyPoints::descriptor_colors(), None),
979 (MyPoints::descriptor_labels(), Some(labels3 as _)),
980 ],
981 )
982 .build()?;
983
984 eprintln!("chunk:\n{chunk}");
985
986 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
987 (row_id1, mypoints_points_component, Some(points1 as _)),
988 (row_id2, mypoints_labels_component, Some(labels4 as _)),
989 (row_id3, mypoints_colors_component, None),
990 (row_id4, mypoints_labels_component, Some(labels2 as _)),
991 (row_id5, mypoints_colors_component, Some(colors5 as _)),
992 ];
993
994 assert!(!chunk.is_sorted());
995 for (row_id, component, expected) in expectations {
996 let expected = expected
997 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
998 eprintln!("{component} @ {row_id}");
999 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1000 }
1001
1002 chunk.sort_if_unsorted();
1003 assert!(chunk.is_sorted());
1004
1005 for (row_id, component, expected) in expectations {
1006 let expected = expected
1007 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1008 eprintln!("{component} @ {row_id}");
1009 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1010 }
1011
1012 Ok(())
1013 }
1014
1015 #[test]
1016 fn dedupe_temporal() -> anyhow::Result<()> {
1017 let mypoints_points_component = MyPoints::descriptor_points().component;
1018 let mypoints_colors_component = MyPoints::descriptor_colors().component;
1019 let mypoints_labels_component = MyPoints::descriptor_labels().component;
1020
1021 let entity_path = "my/entity";
1022
1023 let row_id1 = RowId::new();
1024 let row_id2 = RowId::new();
1025 let row_id3 = RowId::new();
1026 let row_id4 = RowId::new();
1027 let row_id5 = RowId::new();
1028
1029 let timepoint1 = [
1030 (Timeline::log_time(), 1000),
1031 (Timeline::new_sequence("frame"), 1),
1032 ];
1033 let timepoint2 = [
1034 (Timeline::log_time(), 1032),
1035 (Timeline::new_sequence("frame"), 1),
1036 ];
1037 let timepoint3 = [
1038 (Timeline::log_time(), 1064),
1039 (Timeline::new_sequence("frame"), 1),
1040 ];
1041 let timepoint4 = [
1042 (Timeline::log_time(), 1096),
1043 (Timeline::new_sequence("frame"), 2),
1044 ];
1045 let timepoint5 = [
1046 (Timeline::log_time(), 1128),
1047 (Timeline::new_sequence("frame"), 2),
1048 ];
1049
1050 let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1051 let points3 = &[MyPoint::new(6.0, 7.0)];
1052
1053 let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1054 let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1055
1056 let labels1 = &[MyLabel("a".into())];
1057 let labels2 = &[MyLabel("b".into())];
1058 let labels3 = &[MyLabel("c".into())];
1059 let labels4 = &[MyLabel("d".into())];
1060 let labels5 = &[MyLabel("e".into())];
1061
1062 let chunk = Chunk::builder(entity_path)
1063 .with_sparse_component_batches(
1064 row_id1,
1065 timepoint1,
1066 [
1067 (MyPoints::descriptor_points(), Some(points1 as _)),
1068 (MyPoints::descriptor_colors(), None),
1069 (MyPoints::descriptor_labels(), Some(labels1 as _)),
1070 ],
1071 )
1072 .with_sparse_component_batches(
1073 row_id2,
1074 timepoint2,
1075 [
1076 (MyPoints::descriptor_points(), None),
1077 (MyPoints::descriptor_colors(), None),
1078 (MyPoints::descriptor_labels(), Some(labels2 as _)),
1079 ],
1080 )
1081 .with_sparse_component_batches(
1082 row_id3,
1083 timepoint3,
1084 [
1085 (MyPoints::descriptor_points(), Some(points3 as _)),
1086 (MyPoints::descriptor_colors(), None),
1087 (MyPoints::descriptor_labels(), Some(labels3 as _)),
1088 ],
1089 )
1090 .with_sparse_component_batches(
1091 row_id4,
1092 timepoint4,
1093 [
1094 (MyPoints::descriptor_points(), None),
1095 (MyPoints::descriptor_colors(), Some(colors4 as _)),
1096 (MyPoints::descriptor_labels(), Some(labels4 as _)),
1097 ],
1098 )
1099 .with_sparse_component_batches(
1100 row_id5,
1101 timepoint5,
1102 [
1103 (MyPoints::descriptor_points(), None),
1104 (MyPoints::descriptor_colors(), Some(colors5 as _)),
1105 (MyPoints::descriptor_labels(), Some(labels5 as _)),
1106 ],
1107 )
1108 .build()?;
1109
1110 eprintln!("chunk:\n{chunk}");
1111
1112 {
1113 let got = chunk.deduped_latest_on_index(&TimelineName::new("frame"));
1114 eprintln!("got:\n{got}");
1115 assert_eq!(2, got.num_rows());
1116
1117 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1118 (row_id3, mypoints_points_component, Some(points3 as _)),
1119 (row_id3, mypoints_colors_component, None),
1120 (row_id3, mypoints_labels_component, Some(labels3 as _)),
1121 (row_id5, mypoints_points_component, None),
1123 (row_id5, mypoints_colors_component, Some(colors5 as _)),
1124 (row_id5, mypoints_labels_component, Some(labels5 as _)),
1125 ];
1126
1127 for (row_id, component, expected) in expectations {
1128 let expected = expected
1129 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1130 eprintln!("{component} @ {row_id}");
1131 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1132 }
1133 }
1134
1135 {
1136 let got = chunk.deduped_latest_on_index(&TimelineName::log_time());
1137 eprintln!("got:\n{got}");
1138 assert_eq!(5, got.num_rows());
1139
1140 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1141 (row_id1, mypoints_points_component, Some(points1 as _)),
1142 (row_id1, mypoints_colors_component, None),
1143 (row_id1, mypoints_labels_component, Some(labels1 as _)),
1144 (row_id2, mypoints_points_component, None),
1145 (row_id2, mypoints_colors_component, None),
1146 (row_id2, mypoints_labels_component, Some(labels2 as _)),
1147 (row_id3, mypoints_points_component, Some(points3 as _)),
1148 (row_id3, mypoints_colors_component, None),
1149 (row_id3, mypoints_labels_component, Some(labels3 as _)),
1150 (row_id4, mypoints_points_component, None),
1151 (row_id4, mypoints_colors_component, Some(colors4 as _)),
1152 (row_id4, mypoints_labels_component, Some(labels4 as _)),
1153 (row_id5, mypoints_points_component, None),
1154 (row_id5, mypoints_colors_component, Some(colors5 as _)),
1155 (row_id5, mypoints_labels_component, Some(labels5 as _)),
1156 ];
1157
1158 for (row_id, component, expected) in expectations {
1159 let expected = expected
1160 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1161 eprintln!("{component} @ {row_id}");
1162 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1163 }
1164 }
1165
1166 Ok(())
1167 }
1168
1169 #[test]
1170 fn dedupe_static() -> anyhow::Result<()> {
1171 let mypoints_points_component = MyPoints::descriptor_points().component;
1172 let mypoints_colors_component = MyPoints::descriptor_colors().component;
1173 let mypoints_labels_component = MyPoints::descriptor_labels().component;
1174
1175 let entity_path = "my/entity";
1176
1177 let row_id1 = RowId::new();
1178 let row_id2 = RowId::new();
1179 let row_id3 = RowId::new();
1180 let row_id4 = RowId::new();
1181 let row_id5 = RowId::new();
1182
1183 let timepoint_static = TimePoint::default();
1184
1185 let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1186 let points3 = &[MyPoint::new(6.0, 7.0)];
1187
1188 let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1189 let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1190
1191 let labels1 = &[MyLabel("a".into())];
1192 let labels2 = &[MyLabel("b".into())];
1193 let labels3 = &[MyLabel("c".into())];
1194 let labels4 = &[MyLabel("d".into())];
1195 let labels5 = &[MyLabel("e".into())];
1196
1197 let chunk = Chunk::builder(entity_path)
1198 .with_sparse_component_batches(
1199 row_id1,
1200 timepoint_static.clone(),
1201 [
1202 (MyPoints::descriptor_points(), Some(points1 as _)),
1203 (MyPoints::descriptor_colors(), None),
1204 (MyPoints::descriptor_labels(), Some(labels1 as _)),
1205 ],
1206 )
1207 .with_sparse_component_batches(
1208 row_id2,
1209 timepoint_static.clone(),
1210 [
1211 (MyPoints::descriptor_points(), None),
1212 (MyPoints::descriptor_colors(), None),
1213 (MyPoints::descriptor_labels(), Some(labels2 as _)),
1214 ],
1215 )
1216 .with_sparse_component_batches(
1217 row_id3,
1218 timepoint_static.clone(),
1219 [
1220 (MyPoints::descriptor_points(), Some(points3 as _)),
1221 (MyPoints::descriptor_colors(), None),
1222 (MyPoints::descriptor_labels(), Some(labels3 as _)),
1223 ],
1224 )
1225 .with_sparse_component_batches(
1226 row_id4,
1227 timepoint_static.clone(),
1228 [
1229 (MyPoints::descriptor_points(), None),
1230 (MyPoints::descriptor_colors(), Some(colors4 as _)),
1231 (MyPoints::descriptor_labels(), Some(labels4 as _)),
1232 ],
1233 )
1234 .with_sparse_component_batches(
1235 row_id5,
1236 timepoint_static.clone(),
1237 [
1238 (MyPoints::descriptor_points(), None),
1239 (MyPoints::descriptor_colors(), Some(colors5 as _)),
1240 (MyPoints::descriptor_labels(), Some(labels5 as _)),
1241 ],
1242 )
1243 .build()?;
1244
1245 eprintln!("chunk:\n{chunk}");
1246
1247 {
1248 let got = chunk.deduped_latest_on_index(&TimelineName::new("frame"));
1249 eprintln!("got:\n{got}");
1250 assert_eq!(1, got.num_rows());
1251
1252 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1253 (row_id5, mypoints_points_component, None),
1254 (row_id5, mypoints_colors_component, Some(colors5 as _)),
1255 (row_id5, mypoints_labels_component, Some(labels5 as _)),
1256 ];
1257
1258 for (row_id, component, expected) in expectations {
1259 let expected = expected
1260 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1261 eprintln!("{component} @ {row_id}");
1262 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1263 }
1264 }
1265
1266 {
1267 let got = chunk.deduped_latest_on_index(&TimelineName::log_time());
1268 eprintln!("got:\n{got}");
1269 assert_eq!(1, got.num_rows());
1270
1271 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1272 (row_id5, mypoints_points_component, None),
1273 (row_id5, mypoints_colors_component, Some(colors5 as _)),
1274 (row_id5, mypoints_labels_component, Some(labels5 as _)),
1275 ];
1276
1277 for (row_id, component, expected) in expectations {
1278 let expected = expected
1279 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1280 eprintln!("{component} @ {row_id}");
1281 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1282 }
1283 }
1284
1285 Ok(())
1286 }
1287
1288 #[test]
1289 fn filtered() -> anyhow::Result<()> {
1290 let mypoints_points_component = MyPoints::descriptor_points().component;
1291 let mypoints_colors_component = MyPoints::descriptor_colors().component;
1292 let mypoints_labels_component = MyPoints::descriptor_labels().component;
1293
1294 let entity_path = "my/entity";
1295
1296 let row_id1 = RowId::new();
1297 let row_id2 = RowId::new();
1298 let row_id3 = RowId::new();
1299 let row_id4 = RowId::new();
1300 let row_id5 = RowId::new();
1301
1302 let timepoint1 = [
1303 (Timeline::log_time(), 1000),
1304 (Timeline::new_sequence("frame"), 1),
1305 ];
1306 let timepoint2 = [
1307 (Timeline::log_time(), 1032),
1308 (Timeline::new_sequence("frame"), 1),
1309 ];
1310 let timepoint3 = [
1311 (Timeline::log_time(), 1064),
1312 (Timeline::new_sequence("frame"), 1),
1313 ];
1314 let timepoint4 = [
1315 (Timeline::log_time(), 1096),
1316 (Timeline::new_sequence("frame"), 2),
1317 ];
1318 let timepoint5 = [
1319 (Timeline::log_time(), 1128),
1320 (Timeline::new_sequence("frame"), 2),
1321 ];
1322
1323 let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1324 let points3 = &[MyPoint::new(6.0, 7.0)];
1325
1326 let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1327 let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1328
1329 let labels1 = &[MyLabel("a".into())];
1330 let labels2 = &[MyLabel("b".into())];
1331 let labels3 = &[MyLabel("c".into())];
1332 let labels4 = &[MyLabel("d".into())];
1333 let labels5 = &[MyLabel("e".into())];
1334
1335 let chunk = Chunk::builder(entity_path)
1336 .with_sparse_component_batches(
1337 row_id1,
1338 timepoint1,
1339 [
1340 (MyPoints::descriptor_points(), Some(points1 as _)),
1341 (MyPoints::descriptor_colors(), None),
1342 (MyPoints::descriptor_labels(), Some(labels1 as _)),
1343 ],
1344 )
1345 .with_sparse_component_batches(
1346 row_id2,
1347 timepoint2,
1348 [
1349 (MyPoints::descriptor_points(), None),
1350 (MyPoints::descriptor_colors(), None),
1351 (MyPoints::descriptor_labels(), Some(labels2 as _)),
1352 ],
1353 )
1354 .with_sparse_component_batches(
1355 row_id3,
1356 timepoint3,
1357 [
1358 (MyPoints::descriptor_points(), Some(points3 as _)),
1359 (MyPoints::descriptor_colors(), None),
1360 (MyPoints::descriptor_labels(), Some(labels3 as _)),
1361 ],
1362 )
1363 .with_sparse_component_batches(
1364 row_id4,
1365 timepoint4,
1366 [
1367 (MyPoints::descriptor_points(), None),
1368 (MyPoints::descriptor_colors(), Some(colors4 as _)),
1369 (MyPoints::descriptor_labels(), Some(labels4 as _)),
1370 ],
1371 )
1372 .with_sparse_component_batches(
1373 row_id5,
1374 timepoint5,
1375 [
1376 (MyPoints::descriptor_points(), None),
1377 (MyPoints::descriptor_colors(), Some(colors5 as _)),
1378 (MyPoints::descriptor_labels(), Some(labels5 as _)),
1379 ],
1380 )
1381 .build()?;
1382
1383 eprintln!("chunk:\n{chunk}");
1384
1385 {
1387 let filter =
1388 ArrowBooleanArray::from((0..chunk.num_rows()).map(|i| i % 2 == 0).collect_vec());
1389 let got = chunk.filtered(&filter).unwrap();
1390 eprintln!("got:\n{got}");
1391 assert_eq!(
1392 filter.values().iter().filter(|&b| b).count(),
1393 got.num_rows()
1394 );
1395
1396 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1397 (row_id1, mypoints_points_component, Some(points1 as _)),
1398 (row_id1, mypoints_colors_component, None),
1399 (row_id1, mypoints_labels_component, Some(labels1 as _)),
1400 (row_id3, mypoints_points_component, Some(points3 as _)),
1402 (row_id3, mypoints_colors_component, None),
1403 (row_id3, mypoints_labels_component, Some(labels3 as _)),
1404 (row_id5, mypoints_points_component, None),
1406 (row_id5, mypoints_colors_component, Some(colors5 as _)),
1407 (row_id5, mypoints_labels_component, Some(labels5 as _)),
1408 ];
1409
1410 for (row_id, component, expected) in expectations {
1411 let expected = expected
1412 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1413 eprintln!("{component} @ {row_id}");
1414 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1415 }
1416 }
1417
1418 {
1420 let filter = ArrowBooleanArray::from(
1421 (0..chunk.num_rows() / 2).map(|i| i % 2 == 0).collect_vec(),
1422 );
1423 let got = chunk.filtered(&filter);
1424 assert!(got.is_none());
1425 }
1426
1427 {
1429 let filter = ArrowBooleanArray::from(
1430 (0..chunk.num_rows() * 2).map(|i| i % 2 == 0).collect_vec(),
1431 );
1432 let got = chunk.filtered(&filter);
1433 assert!(got.is_none());
1434 }
1435
1436 Ok(())
1437 }
1438
1439 #[test]
1440 fn taken() -> anyhow::Result<()> {
1441 use arrow::array::Int32Array as ArrowInt32Array;
1442
1443 let mypoints_points_component = MyPoints::descriptor_points().component;
1444 let mypoints_colors_component = MyPoints::descriptor_colors().component;
1445 let mypoints_labels_component = MyPoints::descriptor_labels().component;
1446
1447 let entity_path = "my/entity";
1448
1449 let row_id1 = RowId::new();
1450 let row_id2 = RowId::new();
1451 let row_id3 = RowId::new();
1452 let row_id4 = RowId::new();
1453 let row_id5 = RowId::new();
1454
1455 let timepoint1 = [
1456 (Timeline::log_time(), 1000),
1457 (Timeline::new_sequence("frame"), 1),
1458 ];
1459 let timepoint2 = [
1460 (Timeline::log_time(), 1032),
1461 (Timeline::new_sequence("frame"), 1),
1462 ];
1463 let timepoint3 = [
1464 (Timeline::log_time(), 1064),
1465 (Timeline::new_sequence("frame"), 1),
1466 ];
1467 let timepoint4 = [
1468 (Timeline::log_time(), 1096),
1469 (Timeline::new_sequence("frame"), 2),
1470 ];
1471 let timepoint5 = [
1472 (Timeline::log_time(), 1128),
1473 (Timeline::new_sequence("frame"), 2),
1474 ];
1475
1476 let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1477 let points3 = &[MyPoint::new(6.0, 7.0)];
1478
1479 let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1480 let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1481
1482 let labels1 = &[MyLabel("a".into())];
1483 let labels2 = &[MyLabel("b".into())];
1484 let labels3 = &[MyLabel("c".into())];
1485 let labels4 = &[MyLabel("d".into())];
1486 let labels5 = &[MyLabel("e".into())];
1487
1488 let chunk = Chunk::builder(entity_path)
1489 .with_sparse_component_batches(
1490 row_id1,
1491 timepoint1,
1492 [
1493 (MyPoints::descriptor_points(), Some(points1 as _)),
1494 (MyPoints::descriptor_colors(), None),
1495 (MyPoints::descriptor_labels(), Some(labels1 as _)),
1496 ],
1497 )
1498 .with_sparse_component_batches(
1499 row_id2,
1500 timepoint2,
1501 [
1502 (MyPoints::descriptor_points(), None),
1503 (MyPoints::descriptor_colors(), None),
1504 (MyPoints::descriptor_labels(), Some(labels2 as _)),
1505 ],
1506 )
1507 .with_sparse_component_batches(
1508 row_id3,
1509 timepoint3,
1510 [
1511 (MyPoints::descriptor_points(), Some(points3 as _)),
1512 (MyPoints::descriptor_colors(), None),
1513 (MyPoints::descriptor_labels(), Some(labels3 as _)),
1514 ],
1515 )
1516 .with_sparse_component_batches(
1517 row_id4,
1518 timepoint4,
1519 [
1520 (MyPoints::descriptor_points(), None),
1521 (MyPoints::descriptor_colors(), Some(colors4 as _)),
1522 (MyPoints::descriptor_labels(), Some(labels4 as _)),
1523 ],
1524 )
1525 .with_sparse_component_batches(
1526 row_id5,
1527 timepoint5,
1528 [
1529 (MyPoints::descriptor_points(), None),
1530 (MyPoints::descriptor_colors(), Some(colors5 as _)),
1531 (MyPoints::descriptor_labels(), Some(labels5 as _)),
1532 ],
1533 )
1534 .build()?;
1535
1536 eprintln!("chunk:\n{chunk}");
1537
1538 {
1540 let indices = ArrowInt32Array::from(
1541 (0..chunk.num_rows() as i32)
1542 .filter(|i| i % 2 == 0)
1543 .collect_vec(),
1544 );
1545 let got = chunk.taken(&indices);
1546 eprintln!("got:\n{got}");
1547 assert_eq!(indices.len(), got.num_rows());
1548
1549 let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1550 (row_id1, mypoints_points_component, Some(points1 as _)),
1551 (row_id1, mypoints_colors_component, None),
1552 (row_id1, mypoints_labels_component, Some(labels1 as _)),
1553 (row_id3, mypoints_points_component, Some(points3 as _)),
1555 (row_id3, mypoints_colors_component, None),
1556 (row_id3, mypoints_labels_component, Some(labels3 as _)),
1557 (row_id5, mypoints_points_component, None),
1559 (row_id5, mypoints_colors_component, Some(colors5 as _)),
1560 (row_id5, mypoints_labels_component, Some(labels5 as _)),
1561 ];
1562
1563 for (row_id, component, expected) in expectations {
1564 let expected = expected
1565 .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1566 eprintln!("{component} @ {row_id}");
1567 similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1568 }
1569 }
1570
1571 {
1573 let indices = ArrowInt32Array::from(
1574 std::iter::repeat_n(2i32, chunk.num_rows() * 2).collect_vec(),
1575 );
1576 let got = chunk.taken(&indices);
1577 eprintln!("got:\n{got}");
1578 assert_eq!(indices.len(), got.num_rows());
1579 }
1580
1581 Ok(())
1582 }
1583
1584 #[test]
1585 fn slice_memory_size_conservation() -> anyhow::Result<()> {
1586 use arrow::array::{ListArray as ArrowListArray, UInt8Array as ArrowUInt8Array};
1587 use arrow::buffer::OffsetBuffer as ArrowOffsetBuffer;
1588 use re_byte_size::SizeBytes as _;
1589 use re_types_core::{ComponentDescriptor, SerializedComponentColumn};
1590
1591 let entity_path = "test/entity";
1593
1594 let row_id1 = RowId::new();
1595 let row_id2 = RowId::new();
1596 let row_id3 = RowId::new();
1597
1598 let blob_size_1 = 10_000; let blob_size_2 = 20_000; let blob_size_3 = 30_000; let blob_data_1: Vec<u8> = (0..blob_size_1 as u8).cycle().take(blob_size_1).collect();
1604 let blob_data_2: Vec<u8> = (0..blob_size_2 as u8).cycle().take(blob_size_2).collect();
1605 let blob_data_3: Vec<u8> = (0..blob_size_3 as u8).cycle().take(blob_size_3).collect();
1606
1607 let mut all_blob_data: Vec<u8> = Vec::new();
1609 all_blob_data.extend(&blob_data_1);
1610 all_blob_data.extend(&blob_data_2);
1611 all_blob_data.extend(&blob_data_3);
1612
1613 let values_array = ArrowUInt8Array::from(all_blob_data);
1615
1616 let list_array = ArrowListArray::new(
1618 arrow::datatypes::Field::new("item", arrow::datatypes::DataType::UInt8, false).into(),
1619 ArrowOffsetBuffer::from_lengths([blob_size_1, blob_size_2, blob_size_3]),
1620 std::sync::Arc::new(values_array),
1621 None,
1622 );
1623
1624 let blob_descriptor = ComponentDescriptor::partial("blob");
1626
1627 let component_column = SerializedComponentColumn::new(list_array, blob_descriptor);
1629
1630 let chunk = Chunk::new(
1632 crate::ChunkId::new(),
1633 re_log_types::EntityPath::from(entity_path),
1634 Some(true), RowId::arrow_from_slice(&[row_id1, row_id2, row_id3]),
1636 std::iter::once((
1637 *Timeline::new_sequence("frame").name(),
1638 crate::TimeColumn::new_sequence("frame", [1, 2, 3]),
1639 ))
1640 .collect(),
1641 std::iter::once(component_column).collect(),
1642 )?;
1643
1644 let original_size = chunk.heap_size_bytes();
1645 eprintln!("Original chunk size: {original_size} bytes");
1646
1647 let slice1 = chunk.row_sliced_deep(0, 1);
1649 let slice2 = chunk.row_sliced_deep(1, 1);
1650 let slice3 = chunk.row_sliced_deep(2, 1);
1651
1652 let slice1_size = slice1.heap_size_bytes();
1653 let slice2_size = slice2.heap_size_bytes();
1654 let slice3_size = slice3.heap_size_bytes();
1655
1656 eprintln!("Slice 1 size: {slice1_size} bytes ({blob_size_1} byte blob)");
1657 eprintln!("Slice 2 size: {slice2_size} bytes ({blob_size_2} byte blob)");
1658 eprintln!("Slice 3 size: {slice3_size} bytes ({blob_size_3} byte blob)");
1659
1660 let total_slice_size = slice1_size + slice2_size + slice3_size;
1661 eprintln!("Total slices size: {total_slice_size} bytes");
1662
1663 let acceptable_overhead = 650; assert!(
1669 total_slice_size <= original_size + acceptable_overhead,
1670 "Slices total size ({total_slice_size}) should not exceed original size ({original_size}) by more than {acceptable_overhead} bytes of overhead",
1671 );
1672
1673 assert!(
1676 slice3_size > slice1_size,
1677 "Slice 3 with {blob_size_3} bytes ({slice3_size} total bytes) should be larger than slice 1 with {blob_size_1} bytes ({slice1_size} total bytes)",
1678 );
1679
1680 assert!(
1681 slice2_size > slice1_size,
1682 "Slice 2 with {blob_size_2} bytes ({slice2_size} total bytes) should be larger than slice 1 with {blob_size_1} bytes ({slice1_size} total bytes)",
1683 );
1684
1685 let size_ratio_3_to_1 = slice3_size as f64 / slice1_size as f64;
1688 let expected_ratio_3_to_1 = blob_size_3 as f64 / blob_size_1 as f64; assert!(
1691 size_ratio_3_to_1 > 2.0 && size_ratio_3_to_1 < 4.0,
1692 "Size ratio between slice 3 and slice 1 ({size_ratio_3_to_1:.2}) should be close to expected blob ratio ({expected_ratio_3_to_1:.2})",
1693 );
1694
1695 eprintln!("✓ Raw arrow array slice memory calculation test passed!");
1696
1697 Ok(())
1698 }
1699}