1use std::sync::atomic::{AtomicU64, Ordering};
2
3use ahash::HashMap;
4use anyhow::Context as _;
5use arrow::{
6 array::{
7 Array as ArrowArray, ArrayRef as ArrowArrayRef, FixedSizeBinaryArray,
8 ListArray as ArrowListArray,
9 },
10 buffer::{NullBuffer as ArrowNullBuffer, ScalarBuffer as ArrowScalarBuffer},
11};
12use itertools::{Either, Itertools as _, izip};
13use nohash_hasher::IntMap;
14
15use re_arrow_util::{ArrowArrayDowncastRef as _, widen_binary_arrays};
16use re_byte_size::SizeBytes as _;
17use re_log_types::{
18 AbsoluteTimeRange, EntityPath, NonMinI64, TimeInt, TimeType, Timeline, TimelineName,
19};
20use re_types_core::{
21 ComponentDescriptor, ComponentIdentifier, ComponentType, DeserializationError, Loggable as _,
22 SerializationError, SerializedComponentColumn,
23};
24
25use crate::{ChunkId, RowId};
26
27#[derive(thiserror::Error, Debug)]
32pub enum ChunkError {
33 #[error("Detected malformed Chunk: {reason}")]
34 Malformed { reason: String },
35
36 #[error("Arrow: {0}")]
37 Arrow(#[from] arrow::error::ArrowError),
38
39 #[error("{kind} index out of bounds: {index} (len={len})")]
40 IndexOutOfBounds {
41 kind: String,
42 len: usize,
43 index: usize,
44 },
45
46 #[error("Serialization: {0}")]
47 Serialization(#[from] SerializationError),
48
49 #[error("Deserialization: {0}")]
50 Deserialization(#[from] DeserializationError),
51
52 #[error(transparent)]
53 UnsupportedTimeType(#[from] re_sorbet::UnsupportedTimeType),
54
55 #[error(transparent)]
56 WrongDatatypeError(#[from] re_sorbet::WrongDatatypeError),
57
58 #[error(transparent)]
59 MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError),
60
61 #[error(transparent)]
62 InvalidSorbetSchema(#[from] re_sorbet::SorbetError),
63}
64
65const _: () = assert!(
66 std::mem::size_of::<ChunkError>() <= 72,
67 "Error type is too large. Try to reduce its size by boxing some of its variants.",
68);
69
70pub type ChunkResult<T> = Result<T, ChunkError>;
71
72#[derive(Debug, Clone, Default, PartialEq)]
75pub struct ChunkComponents(pub IntMap<ComponentIdentifier, SerializedComponentColumn>);
76
77impl ChunkComponents {
78 #[inline]
82 pub fn get_by_component_type(
83 &self,
84 component_type: ComponentType,
85 ) -> impl Iterator<Item = &ArrowListArray> {
86 self.0.values().filter_map(move |column| {
87 (column.descriptor.component_type == Some(component_type)).then_some(&column.list_array)
88 })
89 }
90
91 pub fn ensure_similar(left: &Self, right: &Self) -> anyhow::Result<()> {
99 anyhow::ensure!(left.len() == right.len());
100 for (component, left_column) in left.iter() {
101 let Some(right_column) = right.get(*component) else {
102 anyhow::bail!("rhs is missing {component:?}");
103 };
104 anyhow::ensure!(left_column.descriptor == right_column.descriptor);
105
106 let left_array = widen_binary_arrays(&left_column.list_array);
107 let right_array = widen_binary_arrays(&right_column.list_array);
108 re_arrow_util::ensure_similar(&left_array.to_data(), &right_array.to_data())
109 .with_context(|| format!("Component {component:?}"))?;
110 }
111 Ok(())
112 }
113
114 #[inline]
116 pub fn contains_component(&self, component: ComponentIdentifier) -> bool {
117 self.0.contains_key(&component)
118 }
119
120 #[inline]
122 pub fn component_descriptors(&self) -> impl Iterator<Item = &ComponentDescriptor> + '_ {
123 self.0.values().map(|column| &column.descriptor)
124 }
125
126 #[inline]
128 pub fn list_arrays(&self) -> impl Iterator<Item = &ArrowListArray> + '_ {
129 self.0.values().map(|column| &column.list_array)
130 }
131
132 #[inline]
134 pub fn list_arrays_mut(&mut self) -> impl Iterator<Item = &mut ArrowListArray> + '_ {
135 self.0.values_mut().map(|column| &mut column.list_array)
136 }
137
138 #[inline]
140 pub fn get_array(&self, component: ComponentIdentifier) -> Option<&ArrowListArray> {
141 self.0.get(&component).map(|column| &column.list_array)
142 }
143
144 #[inline]
146 pub fn get_descriptor(&self, component: ComponentIdentifier) -> Option<&ComponentDescriptor> {
147 self.0.get(&component).map(|column| &column.descriptor)
148 }
149
150 #[inline]
152 pub fn get(&self, component: ComponentIdentifier) -> Option<&SerializedComponentColumn> {
153 self.0.get(&component)
154 }
155
156 #[inline]
160 pub fn insert(
161 &mut self,
162 column: SerializedComponentColumn,
163 ) -> Option<SerializedComponentColumn> {
164 self.0.insert(column.descriptor.component, column)
165 }
166}
167
168impl std::ops::Deref for ChunkComponents {
169 type Target = IntMap<ComponentIdentifier, SerializedComponentColumn>;
170
171 #[inline]
172 fn deref(&self) -> &Self::Target {
173 &self.0
174 }
175}
176
177impl std::ops::DerefMut for ChunkComponents {
178 #[inline]
179 fn deref_mut(&mut self) -> &mut Self::Target {
180 &mut self.0
181 }
182}
183
184impl FromIterator<(ComponentDescriptor, ArrowListArray)> for ChunkComponents {
186 #[inline]
187 fn from_iter<T: IntoIterator<Item = (ComponentDescriptor, ArrowListArray)>>(iter: T) -> Self {
188 let mut this = Self::default();
189 {
190 for (component_desc, list_array) in iter {
191 this.insert(SerializedComponentColumn::new(list_array, component_desc));
192 }
193 }
194 this
195 }
196}
197
198impl FromIterator<SerializedComponentColumn> for ChunkComponents {
199 #[inline]
200 fn from_iter<T: IntoIterator<Item = SerializedComponentColumn>>(iter: T) -> Self {
201 let mut this = Self::default();
202 {
203 for serialized in iter {
204 this.insert(serialized);
205 }
206 }
207 this
208 }
209}
210
211#[derive(Debug)]
221pub struct Chunk {
222 pub(crate) id: ChunkId,
223
224 pub(crate) entity_path: EntityPath,
225
226 pub(crate) heap_size_bytes: AtomicU64,
231
232 pub(crate) is_sorted: bool,
234
235 pub(crate) row_ids: FixedSizeBinaryArray,
237
238 pub(crate) timelines: IntMap<TimelineName, TimeColumn>,
244
245 pub(crate) components: ChunkComponents,
251}
252
253impl PartialEq for Chunk {
254 #[inline]
255 fn eq(&self, other: &Self) -> bool {
256 let Self {
257 id,
258 entity_path,
259 heap_size_bytes: _,
260 is_sorted,
261 row_ids,
262 timelines,
263 components,
264 } = self;
265
266 *id == other.id
267 && *entity_path == other.entity_path
268 && *is_sorted == other.is_sorted
269 && *row_ids == other.row_ids
270 && *timelines == other.timelines
271 && *components == other.components
272 }
273}
274
275impl Chunk {
276 #[must_use]
282 #[inline]
283 pub fn with_id(mut self, id: ChunkId) -> Self {
284 self.id = id;
285 self
286 }
287
288 pub fn ensure_similar(lhs: &Self, rhs: &Self) -> anyhow::Result<()> {
298 let Self {
299 id: _,
300 entity_path,
301 heap_size_bytes: _,
302 is_sorted: _,
303 row_ids: _,
304 timelines,
305 components,
306 } = lhs;
307
308 anyhow::ensure!(*entity_path == rhs.entity_path);
309
310 anyhow::ensure!(timelines.keys().collect_vec() == rhs.timelines.keys().collect_vec());
311
312 for (timeline, left_time_col) in timelines {
313 let right_time_col = rhs
314 .timelines
315 .get(timeline)
316 .ok_or_else(|| anyhow::format_err!("right is missing timeline {timeline:?}"))?;
317 if timeline == &TimelineName::log_time() {
318 continue; }
320 if timeline == "sim_time" {
321 continue; }
323 anyhow::ensure!(
324 left_time_col == right_time_col,
325 "Timeline differs: {timeline:?}"
326 );
327 }
328
329 if entity_path == &EntityPath::properties() {
331 anyhow::ensure!(components.len() == rhs.components.len());
334
335 let recording_time_component: ComponentIdentifier = "RecordingInfo:start_time".into();
337
338 let lhs_components = components
340 .iter()
341 .filter(|&(component, _list_array)| component != &recording_time_component)
342 .map(|(component, list_array)| (*component, list_array.clone()))
343 .collect::<IntMap<_, _>>();
344 let rhs_components = rhs
345 .components
346 .iter()
347 .filter(|&(component, _list_array)| component != &recording_time_component)
348 .map(|(component, list_array)| (*component, list_array.clone()))
349 .collect::<IntMap<_, _>>();
350
351 anyhow::ensure!(lhs_components == rhs_components);
352 Ok(())
353 } else {
354 ChunkComponents::ensure_similar(components, &rhs.components)
355 }
356 }
357
358 pub fn are_equal(&self, other: &Self) -> bool {
360 let Self {
361 id,
362 entity_path,
363 heap_size_bytes: _,
364 is_sorted,
365 row_ids,
366 timelines,
367 components,
368 } = self;
369
370 *id == other.id
371 && *entity_path == other.entity_path
372 && *is_sorted == other.is_sorted
373 && row_ids == &other.row_ids
374 && *timelines == other.timelines
375 && components.0 == other.components.0
376 }
377}
378
379impl Clone for Chunk {
380 #[inline]
381 fn clone(&self) -> Self {
382 Self {
383 id: self.id,
384 entity_path: self.entity_path.clone(),
385 heap_size_bytes: AtomicU64::new(self.heap_size_bytes.load(Ordering::Relaxed)),
386 is_sorted: self.is_sorted,
387 row_ids: self.row_ids.clone(),
388 timelines: self.timelines.clone(),
389 components: self.components.clone(),
390 }
391 }
392}
393
394impl Chunk {
395 #[inline]
400 pub fn clone_as(&self, id: ChunkId, first_row_id: RowId) -> Self {
401 let row_ids = std::iter::from_fn({
402 let mut row_id = first_row_id;
403 move || {
404 let yielded = row_id;
405 row_id = row_id.next();
406 Some(yielded)
407 }
408 })
409 .take(self.row_ids.len())
410 .collect_vec();
411
412 Self {
413 id,
414 row_ids: RowId::arrow_from_slice(&row_ids),
415 ..self.clone()
416 }
417 }
418
419 #[inline]
421 pub fn into_static(mut self) -> Self {
422 self.timelines.clear();
423 self
424 }
425
426 pub fn zeroed(self) -> Self {
428 let row_ids = vec![RowId::ZERO; self.row_ids.len()];
429
430 let row_ids = RowId::arrow_from_slice(&row_ids);
431
432 Self { row_ids, ..self }
433 }
434
435 #[inline]
444 pub fn time_range_per_component(
445 &self,
446 ) -> IntMap<TimelineName, IntMap<ComponentIdentifier, AbsoluteTimeRange>> {
447 re_tracing::profile_function!();
448
449 self.timelines
450 .iter()
451 .map(|(timeline_name, time_column)| {
452 (
453 *timeline_name,
454 time_column.time_range_per_component(&self.components),
455 )
456 })
457 .collect()
458 }
459
460 #[inline]
461 pub fn component_descriptors(&self) -> impl Iterator<Item = &ComponentDescriptor> + '_ {
462 self.components.component_descriptors()
463 }
464
465 #[inline]
471 pub fn num_events_cumulative(&self) -> u64 {
472 self.components
474 .list_arrays()
475 .map(|list_array| {
476 list_array.nulls().map_or_else(
477 || list_array.len() as u64,
478 |validity| validity.len() as u64 - validity.null_count() as u64,
479 )
480 })
481 .sum()
482 }
483
484 pub fn num_events_cumulative_per_unique_time(
492 &self,
493 timeline: &TimelineName,
494 ) -> Vec<(TimeInt, u64)> {
495 re_tracing::profile_function!();
496
497 if self.is_static() {
498 return vec![(TimeInt::STATIC, self.num_events_cumulative())];
499 }
500
501 let Some(time_column) = self.timelines().get(timeline) else {
502 return Vec::new();
503 };
504
505 let time_range = time_column.time_range();
506 if time_range.min() == time_range.max() {
507 return vec![(time_range.min(), self.num_events_cumulative())];
508 }
509
510 let counts = if time_column.is_sorted() {
511 self.num_events_cumulative_per_unique_time_sorted(time_column)
512 } else {
513 self.num_events_cumulative_per_unique_time_unsorted(time_column)
514 };
515
516 debug_assert!(
517 counts
518 .iter()
519 .tuple_windows::<(_, _)>()
520 .all(|((time1, _), (time2, _))| time1 < time2)
521 );
522
523 counts
524 }
525
526 fn num_events_cumulative_per_unique_time_sorted(
527 &self,
528 time_column: &TimeColumn,
529 ) -> Vec<(TimeInt, u64)> {
530 re_tracing::profile_function!();
531
532 debug_assert!(time_column.is_sorted());
533
534 let mut counts_raw = vec![0u64; self.num_rows()];
539 {
540 self.components.list_arrays().for_each(|list_array| {
541 if let Some(validity) = list_array.nulls() {
542 validity
543 .iter()
544 .enumerate()
545 .for_each(|(i, is_valid)| counts_raw[i] += is_valid as u64);
546 } else {
547 for count in &mut counts_raw {
548 *count += 1;
549 }
550 }
551 });
552 }
553
554 let mut counts = Vec::with_capacity(counts_raw.len());
555
556 let Some(mut cur_time) = time_column.times().next() else {
557 return Vec::new();
558 };
559 let mut cur_count = 0;
560 izip!(time_column.times(), counts_raw).for_each(|(time, count)| {
561 if time == cur_time {
562 cur_count += count;
563 } else {
564 counts.push((cur_time, cur_count));
565 cur_count = count;
566 cur_time = time;
567 }
568 });
569
570 if counts.last().map(|(time, _)| *time) != Some(cur_time) {
571 counts.push((cur_time, cur_count));
572 }
573
574 counts
575 }
576
577 fn num_events_cumulative_per_unique_time_unsorted(
578 &self,
579 time_column: &TimeColumn,
580 ) -> Vec<(TimeInt, u64)> {
581 re_tracing::profile_function!();
582
583 debug_assert!(!time_column.is_sorted());
584
585 let result_unordered =
588 self.components
589 .list_arrays()
590 .fold(HashMap::default(), |acc, list_array| {
591 if let Some(validity) = list_array.nulls() {
592 time_column.times().zip(validity.iter()).fold(
593 acc,
594 |mut acc, (time, is_valid)| {
595 *acc.entry(time).or_default() += is_valid as u64;
596 acc
597 },
598 )
599 } else {
600 time_column.times().fold(acc, |mut acc, time| {
601 *acc.entry(time).or_default() += 1;
602 acc
603 })
604 }
605 });
606
607 let mut result = result_unordered.into_iter().collect_vec();
608 result.sort_by_key(|val| val.0);
609 result
610 }
611
612 #[inline]
618 pub fn num_events_for_component(&self, component: ComponentIdentifier) -> Option<u64> {
619 self.components.get_array(component).map(|list_array| {
621 list_array.nulls().map_or_else(
622 || list_array.len() as u64,
623 |validity| validity.len() as u64 - validity.null_count() as u64,
624 )
625 })
626 }
627
628 pub fn row_id_range_per_component(&self) -> IntMap<ComponentIdentifier, (RowId, RowId)> {
637 re_tracing::profile_function!();
638
639 let row_ids = self.row_ids().collect_vec();
640
641 if self.is_sorted() {
642 self.components
643 .iter()
644 .filter_map(|(component, column)| {
645 let mut row_id_min = None;
646 let mut row_id_max = None;
647
648 for (i, &row_id) in row_ids.iter().enumerate() {
649 if column.list_array.is_valid(i) {
650 row_id_min = Some(row_id);
651 }
652 }
653 for (i, &row_id) in row_ids.iter().enumerate().rev() {
654 if column.list_array.is_valid(i) {
655 row_id_max = Some(row_id);
656 }
657 }
658
659 Some((*component, (row_id_min?, row_id_max?)))
660 })
661 .collect()
662 } else {
663 self.components
664 .iter()
665 .filter_map(|(component, column)| {
666 let mut row_id_min = Some(RowId::MAX);
667 let mut row_id_max = Some(RowId::ZERO);
668
669 for (i, &row_id) in row_ids.iter().enumerate() {
670 if column.list_array.is_valid(i) && Some(row_id) > row_id_min {
671 row_id_min = Some(row_id);
672 }
673 }
674 for (i, &row_id) in row_ids.iter().enumerate().rev() {
675 if column.list_array.is_valid(i) && Some(row_id) < row_id_max {
676 row_id_max = Some(row_id);
677 }
678 }
679
680 Some((*component, (row_id_min?, row_id_max?)))
681 })
682 .collect()
683 }
684 }
685}
686
687#[derive(Debug, Clone, PartialEq, Eq)]
690pub struct TimeColumn {
691 pub(crate) timeline: Timeline,
692
693 pub(crate) times: ArrowScalarBuffer<i64>,
704
705 pub(crate) is_sorted: bool,
710
711 pub(crate) time_range: AbsoluteTimeRange,
715}
716
717#[derive(Debug, thiserror::Error)]
719pub enum TimeColumnError {
720 #[error("Time columns had nulls, but should be dense")]
721 ContainsNulls,
722
723 #[error("Unsupported data type : {0}")]
724 UnsupportedDataType(arrow::datatypes::DataType),
725}
726
727impl Chunk {
728 pub fn new(
738 id: ChunkId,
739 entity_path: EntityPath,
740 is_sorted: Option<bool>,
741 row_ids: FixedSizeBinaryArray,
742 timelines: IntMap<TimelineName, TimeColumn>,
743 components: ChunkComponents,
744 ) -> ChunkResult<Self> {
745 let mut chunk = Self {
746 id,
747 entity_path,
748 heap_size_bytes: AtomicU64::new(0),
749 is_sorted: false,
750 row_ids,
751 timelines,
752 components,
753 };
754
755 chunk.is_sorted = is_sorted.unwrap_or_else(|| chunk.is_sorted_uncached());
756
757 chunk.sanity_check()?;
758
759 Ok(chunk)
760 }
761
762 pub fn from_native_row_ids(
772 id: ChunkId,
773 entity_path: EntityPath,
774 is_sorted: Option<bool>,
775 row_ids: &[RowId],
776 timelines: IntMap<TimelineName, TimeColumn>,
777 components: ChunkComponents,
778 ) -> ChunkResult<Self> {
779 re_tracing::profile_function!();
780 let row_ids = RowId::arrow_from_slice(row_ids);
781 Self::new(id, entity_path, is_sorted, row_ids, timelines, components)
782 }
783
784 pub fn from_auto_row_ids(
792 id: ChunkId,
793 entity_path: EntityPath,
794 timelines: IntMap<TimelineName, TimeColumn>,
795 components: ChunkComponents,
796 ) -> ChunkResult<Self> {
797 let count = components
798 .list_arrays()
799 .next()
800 .map_or(0, |list_array| list_array.len());
801
802 let row_ids = std::iter::from_fn({
803 let tuid: re_tuid::Tuid = *id;
804 let mut row_id = RowId::from_tuid(tuid.next());
805 move || {
806 let yielded = row_id;
807 row_id = row_id.next();
808 Some(yielded)
809 }
810 })
811 .take(count)
812 .collect_vec();
813
814 Self::from_native_row_ids(id, entity_path, Some(true), &row_ids, timelines, components)
815 }
816
817 #[inline]
821 pub fn new_static(
822 id: ChunkId,
823 entity_path: EntityPath,
824 is_sorted: Option<bool>,
825 row_ids: FixedSizeBinaryArray,
826 components: ChunkComponents,
827 ) -> ChunkResult<Self> {
828 Self::new(
829 id,
830 entity_path,
831 is_sorted,
832 row_ids,
833 Default::default(),
834 components,
835 )
836 }
837
838 #[inline]
839 pub fn empty(id: ChunkId, entity_path: EntityPath) -> Self {
840 Self {
841 id,
842 entity_path,
843 heap_size_bytes: Default::default(),
844 is_sorted: true,
845 row_ids: RowId::arrow_from_slice(&[]),
846 timelines: Default::default(),
847 components: Default::default(),
848 }
849 }
850
851 #[inline]
857 pub fn add_component(
858 &mut self,
859 component_column: SerializedComponentColumn,
860 ) -> ChunkResult<()> {
861 self.components.insert(component_column);
862 self.sanity_check()
863 }
864
865 #[inline]
871 pub fn add_timeline(&mut self, chunk_timeline: TimeColumn) -> ChunkResult<()> {
872 self.timelines
873 .insert(*chunk_timeline.timeline.name(), chunk_timeline);
874 self.sanity_check()
875 }
876}
877
878impl TimeColumn {
879 pub fn new(is_sorted: Option<bool>, timeline: Timeline, times: ArrowScalarBuffer<i64>) -> Self {
886 re_tracing::profile_function_if!(1000 < times.len(), format!("{} times", times.len()));
887
888 let time_slice = times.as_ref();
889
890 let is_sorted =
891 is_sorted.unwrap_or_else(|| time_slice.windows(2).all(|times| times[0] <= times[1]));
892
893 let time_range = if is_sorted {
894 let min_time = time_slice
896 .first()
897 .copied()
898 .map_or(TimeInt::MIN, TimeInt::new_temporal);
899 let max_time = time_slice
900 .last()
901 .copied()
902 .map_or(TimeInt::MAX, TimeInt::new_temporal);
903 AbsoluteTimeRange::new(min_time, max_time)
904 } else {
905 let min_time = time_slice
908 .iter()
909 .min()
910 .copied()
911 .map_or(TimeInt::MIN, TimeInt::new_temporal);
912 let max_time = time_slice
913 .iter()
914 .max()
915 .copied()
916 .map_or(TimeInt::MAX, TimeInt::new_temporal);
917 AbsoluteTimeRange::new(min_time, max_time)
918 };
919
920 Self {
921 timeline,
922 times,
923 is_sorted,
924 time_range,
925 }
926 }
927
928 pub fn new_sequence(
930 name: impl Into<re_log_types::TimelineName>,
931 times: impl IntoIterator<Item = impl Into<i64>>,
932 ) -> Self {
933 let time_vec: Vec<_> = times.into_iter().map(|t| {
934 let t = t.into();
935 TimeInt::try_from(t)
936 .unwrap_or_else(|_| {
937 re_log::error!(
938 illegal_value = t,
939 new_value = TimeInt::MIN.as_i64(),
940 "TimeColumn::new_sequence() called with illegal value - clamped to minimum legal value"
941 );
942 TimeInt::MIN
943 })
944 .as_i64()
945 }).collect();
946
947 Self::new(
948 None,
949 Timeline::new_sequence(name.into()),
950 ArrowScalarBuffer::from(time_vec),
951 )
952 }
953
954 pub fn new_duration_secs(
956 name: impl Into<re_log_types::TimelineName>,
957 seconds: impl IntoIterator<Item = impl Into<f64>>,
958 ) -> Self {
959 let time_vec = seconds.into_iter().map(|seconds| {
960 let seconds = seconds.into();
961 let nanos = (1e9 * seconds).round();
962 let clamped = NonMinI64::saturating_from_i64(nanos as i64);
963 if clamped.get() as f64 != nanos {
964 re_log::warn!(
965 illegal_value = nanos,
966 new_value = clamped.get(),
967 "TimeColumn::new_duration_secs() called with out-of-range value. Clamped to valid range."
968 );
969 }
970 clamped.get()
971 }).collect_vec();
972
973 Self::new(
974 None,
975 Timeline::new(name, TimeType::DurationNs),
976 ArrowScalarBuffer::from(time_vec),
977 )
978 }
979
980 pub fn new_timestamp_secs_since_epoch(
982 name: impl Into<re_log_types::TimelineName>,
983 seconds: impl IntoIterator<Item = impl Into<f64>>,
984 ) -> Self {
985 let time_vec = seconds.into_iter().map(|seconds| {
986 let seconds = seconds.into();
987 let nanos = (1e9 * seconds).round();
988 let clamped = NonMinI64::saturating_from_i64(nanos as i64);
989 if clamped.get() as f64 != nanos {
990 re_log::warn!(
991 illegal_value = nanos,
992 new_value = clamped.get(),
993 "TimeColumn::new_timestamp_secs_since_epoch() called with out-of-range value. Clamped to valid range."
994 );
995 }
996 clamped.get()
997 }).collect_vec();
998
999 Self::new(
1000 None,
1001 Timeline::new(name, TimeType::TimestampNs),
1002 ArrowScalarBuffer::from(time_vec),
1003 )
1004 }
1005
1006 #[deprecated = "Use `TimeColumn::new_duration_secs` or `new_timestamp_secs_since_epoch` instead"]
1008 pub fn new_seconds(
1009 name: impl Into<re_log_types::TimelineName>,
1010 seconds: impl IntoIterator<Item = impl Into<f64>>,
1011 ) -> Self {
1012 Self::new_duration_secs(name, seconds)
1013 }
1014
1015 pub fn new_duration_nanos(
1017 name: impl Into<re_log_types::TimelineName>,
1018 nanos: impl IntoIterator<Item = impl Into<i64>>,
1019 ) -> Self {
1020 let time_vec = nanos
1021 .into_iter()
1022 .map(|nanos| {
1023 let nanos = nanos.into();
1024 NonMinI64::new(nanos)
1025 .unwrap_or_else(|| {
1026 re_log::error!(
1027 illegal_value = nanos,
1028 new_value = TimeInt::MIN.as_i64(),
1029 "TimeColumn::new_duration_nanos() called with illegal value - clamped to minimum legal value"
1030 );
1031 NonMinI64::MIN
1032 })
1033 .get()
1034 })
1035 .collect_vec();
1036
1037 Self::new(
1038 None,
1039 Timeline::new(name, TimeType::DurationNs),
1040 ArrowScalarBuffer::from(time_vec),
1041 )
1042 }
1043
1044 pub fn new_timestamp_nanos_since_epoch(
1046 name: impl Into<re_log_types::TimelineName>,
1047 nanos: impl IntoIterator<Item = impl Into<i64>>,
1048 ) -> Self {
1049 let time_vec = nanos
1050 .into_iter()
1051 .map(|nanos| {
1052 let nanos = nanos.into();
1053 NonMinI64::new(nanos)
1054 .unwrap_or_else(|| {
1055 re_log::error!(
1056 illegal_value = nanos,
1057 new_value = TimeInt::MIN.as_i64(),
1058 "TimeColumn::new_timestamp_nanos_since_epoch() called with illegal value - clamped to minimum legal value"
1059 );
1060 NonMinI64::MIN
1061 })
1062 .get()
1063 })
1064 .collect_vec();
1065
1066 Self::new(
1067 None,
1068 Timeline::new(name, TimeType::TimestampNs),
1069 ArrowScalarBuffer::from(time_vec),
1070 )
1071 }
1072
1073 #[deprecated = "Use `TimeColumn::new_duration_nanos` or `new_timestamp_nanos_since_epoch` instead"]
1075 pub fn new_nanos(
1076 name: impl Into<re_log_types::TimelineName>,
1077 nanos: impl IntoIterator<Item = impl Into<i64>>,
1078 ) -> Self {
1079 Self::new_duration_nanos(name, nanos)
1080 }
1081
1082 pub fn read_array(array: &dyn ArrowArray) -> Result<ArrowScalarBuffer<i64>, TimeColumnError> {
1086 if array.null_count() > 0 {
1087 Err(TimeColumnError::ContainsNulls)
1088 } else {
1089 Self::read_nullable_array(array).map(|(times, _nulls)| times)
1090 }
1091 }
1092
1093 pub fn read_nullable_array(
1097 array: &dyn ArrowArray,
1098 ) -> Result<(ArrowScalarBuffer<i64>, Option<ArrowNullBuffer>), TimeColumnError> {
1099 if let Some(times) = array.downcast_array_ref::<arrow::array::Int64Array>() {
1101 Ok((times.values().clone(), times.nulls().cloned()))
1102 } else if let Some(times) =
1103 array.downcast_array_ref::<arrow::array::TimestampNanosecondArray>()
1104 {
1105 Ok((times.values().clone(), times.nulls().cloned()))
1106 } else if let Some(times) =
1107 array.downcast_array_ref::<arrow::array::Time64NanosecondArray>()
1108 {
1109 Ok((times.values().clone(), times.nulls().cloned()))
1110 } else if let Some(times) =
1111 array.downcast_array_ref::<arrow::array::DurationNanosecondArray>()
1112 {
1113 Ok((times.values().clone(), times.nulls().cloned()))
1114 } else {
1115 Err(TimeColumnError::UnsupportedDataType(
1116 array.data_type().clone(),
1117 ))
1118 }
1119 }
1120}
1121
1122impl Chunk {
1125 #[inline]
1126 pub fn id(&self) -> ChunkId {
1127 self.id
1128 }
1129
1130 #[inline]
1131 pub fn entity_path(&self) -> &EntityPath {
1132 &self.entity_path
1133 }
1134
1135 #[inline]
1137 pub fn num_columns(&self) -> usize {
1138 let Self {
1139 id: _,
1140 entity_path: _, heap_size_bytes: _,
1142 is_sorted: _,
1143 row_ids: _,
1144 timelines,
1145 components,
1146 } = self;
1147
1148 1 + timelines.len() + components.len()
1149 }
1150
1151 #[inline]
1152 pub fn num_controls(&self) -> usize {
1153 _ = self;
1154 1 }
1156
1157 #[inline]
1158 pub fn num_timelines(&self) -> usize {
1159 self.timelines.len()
1160 }
1161
1162 #[inline]
1163 pub fn num_components(&self) -> usize {
1164 self.components.len()
1165 }
1166
1167 #[inline]
1168 pub fn num_rows(&self) -> usize {
1169 self.row_ids.len()
1170 }
1171
1172 #[inline]
1173 pub fn is_empty(&self) -> bool {
1174 self.num_rows() == 0
1175 }
1176
1177 #[inline]
1178 pub fn row_ids_array(&self) -> &FixedSizeBinaryArray {
1179 &self.row_ids
1180 }
1181
1182 #[inline]
1183 pub fn row_ids_slice(&self) -> &[RowId] {
1184 RowId::slice_from_arrow(&self.row_ids)
1185 }
1186
1187 #[inline]
1191 pub fn row_ids(&self) -> impl ExactSizeIterator<Item = RowId> + '_ {
1192 self.row_ids_slice().iter().copied()
1193 }
1194
1195 #[inline]
1200 pub fn component_row_ids(
1201 &self,
1202 component: ComponentIdentifier,
1203 ) -> impl Iterator<Item = RowId> + '_ + use<'_> {
1204 let Some(list_array) = self.components.get_array(component) else {
1205 return Either::Left(std::iter::empty());
1206 };
1207
1208 let row_ids = self.row_ids();
1209
1210 if let Some(validity) = list_array.nulls() {
1211 Either::Right(Either::Left(
1212 row_ids
1213 .enumerate()
1214 .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
1215 ))
1216 } else {
1217 Either::Right(Either::Right(row_ids))
1218 }
1219 }
1220
1221 #[inline]
1227 pub fn row_id_range(&self) -> Option<(RowId, RowId)> {
1228 if self.is_empty() {
1229 return None;
1230 }
1231
1232 let row_ids = self.row_ids_slice();
1233
1234 #[expect(clippy::unwrap_used)] Some(if self.is_sorted() {
1236 (
1237 row_ids.first().copied().unwrap(),
1238 row_ids.last().copied().unwrap(),
1239 )
1240 } else {
1241 (
1242 row_ids.iter().min().copied().unwrap(),
1243 row_ids.iter().max().copied().unwrap(),
1244 )
1245 })
1246 }
1247
1248 #[inline]
1249 pub fn is_static(&self) -> bool {
1250 self.timelines.is_empty()
1251 }
1252
1253 #[inline]
1254 pub fn timelines(&self) -> &IntMap<TimelineName, TimeColumn> {
1255 &self.timelines
1256 }
1257
1258 #[inline]
1259 pub fn components_identifiers(&self) -> impl Iterator<Item = ComponentIdentifier> + '_ {
1260 self.components.keys().copied()
1261 }
1262
1263 #[inline]
1264 pub fn components(&self) -> &ChunkComponents {
1265 &self.components
1266 }
1267}
1268
1269impl std::fmt::Display for Chunk {
1270 #[inline]
1271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1272 let batch = self.to_record_batch().map_err(|err| {
1273 re_log::error_once!("couldn't display Chunk: {err}");
1274 std::fmt::Error
1275 })?;
1276 re_arrow_util::format_record_batch_with_width(&batch, f.width(), f.sign_minus()).fmt(f)
1277 }
1278}
1279
1280impl TimeColumn {
1281 #[inline]
1282 pub fn timeline(&self) -> &Timeline {
1283 &self.timeline
1284 }
1285
1286 #[inline]
1287 pub fn name(&self) -> &str {
1288 self.timeline.name()
1289 }
1290
1291 #[inline]
1292 pub fn time_range(&self) -> AbsoluteTimeRange {
1293 self.time_range
1294 }
1295
1296 #[inline]
1297 pub fn times_buffer(&self) -> &ArrowScalarBuffer<i64> {
1298 &self.times
1299 }
1300
1301 #[inline]
1303 pub fn times_array(&self) -> ArrowArrayRef {
1304 self.timeline.typ().make_arrow_array(self.times.clone())
1305 }
1306
1307 #[inline]
1310 pub fn times_raw(&self) -> &[i64] {
1311 self.times.as_ref()
1312 }
1313
1314 #[inline]
1317 pub fn times_nonmin(&self) -> impl DoubleEndedIterator<Item = NonMinI64> + '_ {
1318 self.times_raw()
1319 .iter()
1320 .copied()
1321 .map(NonMinI64::saturating_from_i64)
1322 }
1323
1324 #[inline]
1325 pub fn times(&self) -> impl DoubleEndedIterator<Item = TimeInt> + '_ {
1326 self.times_raw().iter().copied().map(TimeInt::new_temporal)
1327 }
1328
1329 #[inline]
1330 pub fn num_rows(&self) -> usize {
1331 self.times.len()
1332 }
1333
1334 #[inline]
1335 pub fn is_empty(&self) -> bool {
1336 self.num_rows() == 0
1337 }
1338
1339 pub fn time_range_per_component(
1348 &self,
1349 components: &ChunkComponents,
1350 ) -> IntMap<ComponentIdentifier, AbsoluteTimeRange> {
1351 let times = self.times_raw();
1352 components
1353 .iter()
1354 .filter_map(|(component, column)| {
1355 if let Some(validity) = column.list_array.nulls() {
1356 if validity.is_empty() {
1359 return None;
1360 }
1361
1362 let is_dense = validity.null_count() == 0;
1363 if is_dense {
1364 return Some((*component, self.time_range));
1365 }
1366
1367 let mut time_min = TimeInt::MAX;
1368 for (i, time) in times.iter().copied().enumerate() {
1369 if validity.is_valid(i) {
1370 time_min = TimeInt::new_temporal(time);
1371 break;
1372 }
1373 }
1374
1375 let mut time_max = TimeInt::MIN;
1376 for (i, time) in times.iter().copied().enumerate().rev() {
1377 if validity.is_valid(i) {
1378 time_max = TimeInt::new_temporal(time);
1379 break;
1380 }
1381 }
1382
1383 Some((*component, AbsoluteTimeRange::new(time_min, time_max)))
1384 } else {
1385 Some((*component, self.time_range))
1388 }
1389 })
1390 .collect()
1391 }
1392}
1393
1394impl re_byte_size::SizeBytes for Chunk {
1395 #[inline]
1396 fn heap_size_bytes(&self) -> u64 {
1397 let Self {
1398 id,
1399 entity_path,
1400 heap_size_bytes,
1401 is_sorted,
1402 row_ids,
1403 timelines,
1404 components,
1405 } = self;
1406
1407 let mut size_bytes = heap_size_bytes.load(Ordering::Relaxed);
1408
1409 if size_bytes == 0 {
1410 size_bytes = id.heap_size_bytes()
1411 + entity_path.heap_size_bytes()
1412 + is_sorted.heap_size_bytes()
1413 + row_ids.heap_size_bytes()
1414 + timelines.heap_size_bytes()
1415 + components.heap_size_bytes();
1416 heap_size_bytes.store(size_bytes, Ordering::Relaxed);
1417 }
1418
1419 size_bytes
1420 }
1421}
1422
1423impl re_byte_size::SizeBytes for TimeColumn {
1424 #[inline]
1425 fn heap_size_bytes(&self) -> u64 {
1426 let Self {
1427 timeline,
1428 times,
1429 is_sorted,
1430 time_range,
1431 } = self;
1432
1433 timeline.heap_size_bytes()
1434 + times.heap_size_bytes() + is_sorted.heap_size_bytes()
1436 + time_range.heap_size_bytes()
1437 }
1438}
1439
1440impl Chunk {
1443 #[track_caller]
1447 pub fn sanity_check(&self) -> ChunkResult<()> {
1448 re_tracing::profile_function!();
1449
1450 let Self {
1451 id: _,
1452 entity_path: _,
1453 heap_size_bytes,
1454 is_sorted,
1455 row_ids,
1456 timelines,
1457 components,
1458 } = self;
1459
1460 if cfg!(debug_assertions) {
1461 let measured = self.heap_size_bytes();
1462 let advertised = heap_size_bytes.load(Ordering::Relaxed);
1463 if advertised != measured {
1464 return Err(ChunkError::Malformed {
1465 reason: format!(
1466 "Chunk advertises a heap size of {} but we measure {} instead",
1467 re_format::format_bytes(advertised as _),
1468 re_format::format_bytes(measured as _),
1469 ),
1470 });
1471 }
1472 }
1473
1474 {
1476 if *row_ids.data_type() != RowId::arrow_datatype() {
1477 return Err(ChunkError::Malformed {
1478 reason: format!(
1479 "RowId data has the wrong datatype: expected {:?} but got {:?} instead",
1480 RowId::arrow_datatype(),
1481 *row_ids.data_type(),
1482 ),
1483 });
1484 }
1485
1486 #[expect(clippy::collapsible_if)] if cfg!(debug_assertions) {
1488 if *is_sorted != self.is_sorted_uncached() {
1489 return Err(ChunkError::Malformed {
1490 reason: format!(
1491 "Chunk is marked as {}sorted but isn't: {row_ids:?}",
1492 if *is_sorted { "" } else { "un" },
1493 ),
1494 });
1495 }
1496 }
1497 }
1498
1499 for (timeline_name, time_column) in timelines {
1501 if time_column.times.len() != row_ids.len() {
1502 return Err(ChunkError::Malformed {
1503 reason: format!(
1504 "All timelines in a chunk must have the same number of timestamps, matching the number of row IDs. \
1505 Found {} row IDs but {} timestamps for timeline '{timeline_name}'",
1506 row_ids.len(),
1507 time_column.times.len(),
1508 ),
1509 });
1510 }
1511
1512 time_column.sanity_check()?;
1513 }
1514
1515 for (component, column) in components.iter() {
1518 let SerializedComponentColumn {
1519 list_array,
1520 descriptor,
1521 } = column;
1522
1523 if descriptor.component != *component {
1524 return Err(ChunkError::Malformed {
1525 reason: format!(
1526 "Component key & descriptor mismatch. Descriptor: {descriptor:?}. Key: {component:?}",
1527 ),
1528 });
1529 }
1530
1531 if let arrow::datatypes::DataType::List(_field) = list_array.data_type() {
1533 } else {
1536 return Err(ChunkError::Malformed {
1537 reason: format!(
1538 "The inner array in a chunked component batch must be a list, got {:?}",
1539 list_array.data_type(),
1540 ),
1541 });
1542 }
1543
1544 if list_array.len() != row_ids.len() {
1545 return Err(ChunkError::Malformed {
1546 reason: format!(
1547 "All component batches in a chunk must have the same number of rows, matching the number of row IDs. \
1548 Found {} row IDs but {} rows for component batch {component}",
1549 row_ids.len(),
1550 list_array.len(),
1551 ),
1552 });
1553 }
1554
1555 let validity_is_empty = list_array
1556 .nulls()
1557 .is_some_and(|validity| validity.is_empty());
1558 if !self.is_empty() && validity_is_empty {
1559 return Err(ChunkError::Malformed {
1560 reason: format!(
1561 "All component batches in a chunk must contain at least one non-null entry.\
1562 Found a completely empty column for {component}",
1563 ),
1564 });
1565 }
1566 }
1567
1568 Ok(())
1569 }
1570}
1571
1572impl TimeColumn {
1573 #[track_caller]
1577 pub fn sanity_check(&self) -> ChunkResult<()> {
1578 let Self {
1579 timeline: _,
1580 times,
1581 is_sorted,
1582 time_range,
1583 } = self;
1584
1585 let times = times.as_ref();
1586
1587 if cfg!(debug_assertions)
1588 && *is_sorted != times.windows(2).all(|times| times[0] <= times[1])
1589 {
1590 return Err(ChunkError::Malformed {
1591 reason: format!(
1592 "Time column is marked as {}sorted but isn't: {times:?}",
1593 if *is_sorted { "" } else { "un" },
1594 ),
1595 });
1596 }
1597
1598 if cfg!(debug_assertions) {
1599 let is_tight_lower_bound = times.iter().any(|&time| time == time_range.min().as_i64());
1600 let is_tight_upper_bound = times.iter().any(|&time| time == time_range.max().as_i64());
1601 let is_tight_bound = is_tight_lower_bound && is_tight_upper_bound;
1602
1603 if !self.is_empty() && !is_tight_bound {
1604 return Err(ChunkError::Malformed {
1605 reason: "Time column's cached time range isn't a tight bound.".to_owned(),
1606 });
1607 }
1608
1609 for &time in times {
1610 if time < time_range.min().as_i64() || time > time_range.max().as_i64() {
1611 return Err(ChunkError::Malformed {
1612 reason: format!(
1613 "Time column's cached time range is wrong.\
1614 Found a time value of {time} while its time range is {time_range:?}",
1615 ),
1616 });
1617 }
1618
1619 if time == TimeInt::STATIC.as_i64() {
1620 return Err(ChunkError::Malformed {
1621 reason: "A chunk's timeline should never contain a static time value."
1622 .to_owned(),
1623 });
1624 }
1625 }
1626 }
1627
1628 Ok(())
1629 }
1630}