1use std::sync::atomic::{AtomicU64, Ordering};
2
3use ahash::HashMap;
4use anyhow::Context as _;
5use arrow::{
6 array::{
7 Array as ArrowArray, ArrayRef as ArrowArrayRef, FixedSizeBinaryArray,
8 ListArray as ArrowListArray,
9 },
10 buffer::{NullBuffer as ArrowNullBuffer, ScalarBuffer as ArrowScalarBuffer},
11};
12use itertools::{Either, Itertools as _, izip};
13use nohash_hasher::IntMap;
14
15use re_arrow_util::{ArrowArrayDowncastRef as _, widen_binary_arrays};
16use re_byte_size::SizeBytes as _;
17use re_log_types::{
18 AbsoluteTimeRange, EntityPath, NonMinI64, TimeInt, TimeType, Timeline, TimelineName,
19};
20use re_types_core::{
21 ArchetypeName, ComponentDescriptor, ComponentType, DeserializationError, Loggable as _,
22 SerializationError, SerializedComponentColumn,
23};
24
25use crate::{ChunkId, RowId};
26
27#[derive(thiserror::Error, Debug)]
32pub enum ChunkError {
33 #[error("Detected malformed Chunk: {reason}")]
34 Malformed { reason: String },
35
36 #[error("Arrow: {0}")]
37 Arrow(#[from] arrow::error::ArrowError),
38
39 #[error("{kind} index out of bounds: {index} (len={len})")]
40 IndexOutOfBounds {
41 kind: String,
42 len: usize,
43 index: usize,
44 },
45
46 #[error("Serialization: {0}")]
47 Serialization(#[from] SerializationError),
48
49 #[error("Deserialization: {0}")]
50 Deserialization(#[from] DeserializationError),
51
52 #[error(transparent)]
53 UnsupportedTimeType(#[from] re_sorbet::UnsupportedTimeType),
54
55 #[error(transparent)]
56 WrongDatatypeError(#[from] re_sorbet::WrongDatatypeError),
57
58 #[error(transparent)]
59 MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError),
60
61 #[error(transparent)]
62 InvalidSorbetSchema(#[from] re_sorbet::SorbetError),
63}
64
65const _: () = assert!(
66 std::mem::size_of::<ChunkError>() <= 72,
67 "Error type is too large. Try to reduce its size by boxing some of its variants.",
68);
69
70pub type ChunkResult<T> = Result<T, ChunkError>;
71
72#[derive(Debug, Clone, Default, PartialEq)]
75pub struct ChunkComponents(pub IntMap<ComponentDescriptor, ArrowListArray>);
76
77impl ChunkComponents {
78 #[inline]
82 pub fn get_by_component_type(
83 &self,
84 component_type: ComponentType,
85 ) -> impl Iterator<Item = &ArrowListArray> {
86 self.0.iter().filter_map(move |(desc, array)| {
87 (desc.component_type == Some(component_type)).then_some(array)
88 })
89 }
90
91 pub fn ensure_similar(left: &Self, right: &Self) -> anyhow::Result<()> {
99 anyhow::ensure!(left.len() == right.len());
100 for (descr, left_array) in left.iter() {
101 let Some(right_array) = right.get(descr) else {
102 anyhow::bail!("rhs is missing {descr:?}");
103 };
104 let left_array = widen_binary_arrays(left_array);
105 let right_array = widen_binary_arrays(right_array);
106 re_arrow_util::ensure_similar(&left_array.to_data(), &right_array.to_data())
107 .with_context(|| format!("Component {descr:?}"))?;
108 }
109 Ok(())
110 }
111
112 pub fn contains_component(&self, component_descr: &ComponentDescriptor) -> bool {
114 self.0.contains_key(component_descr)
115 }
116
117 pub fn has_component_with_archetype(&self, archetype_name: ArchetypeName) -> bool {
119 self.0
120 .keys()
121 .any(|desc| desc.archetype == Some(archetype_name))
122 }
123}
124
125impl std::ops::Deref for ChunkComponents {
126 type Target = IntMap<ComponentDescriptor, ArrowListArray>;
127
128 #[inline]
129 fn deref(&self) -> &Self::Target {
130 &self.0
131 }
132}
133
134impl std::ops::DerefMut for ChunkComponents {
135 #[inline]
136 fn deref_mut(&mut self) -> &mut Self::Target {
137 &mut self.0
138 }
139}
140
141impl FromIterator<(ComponentDescriptor, ArrowListArray)> for ChunkComponents {
142 #[inline]
143 fn from_iter<T: IntoIterator<Item = (ComponentDescriptor, ArrowListArray)>>(iter: T) -> Self {
144 let mut this = Self::default();
145 {
146 for (component_desc, list_array) in iter {
147 this.insert(component_desc, list_array);
148 }
149 }
150 this
151 }
152}
153
154impl FromIterator<SerializedComponentColumn> for ChunkComponents {
155 #[inline]
156 fn from_iter<T: IntoIterator<Item = SerializedComponentColumn>>(iter: T) -> Self {
157 let mut this = Self::default();
158 {
159 for serialized in iter {
160 this.insert(serialized.descriptor, serialized.list_array);
161 }
162 }
163 this
164 }
165}
166
167#[derive(Debug)]
177pub struct Chunk {
178 pub(crate) id: ChunkId,
179
180 pub(crate) entity_path: EntityPath,
181
182 pub(crate) heap_size_bytes: AtomicU64,
187
188 pub(crate) is_sorted: bool,
190
191 pub(crate) row_ids: FixedSizeBinaryArray,
193
194 pub(crate) timelines: IntMap<TimelineName, TimeColumn>,
200
201 pub(crate) components: ChunkComponents,
207}
208
209impl PartialEq for Chunk {
210 #[inline]
211 fn eq(&self, other: &Self) -> bool {
212 let Self {
213 id,
214 entity_path,
215 heap_size_bytes: _,
216 is_sorted,
217 row_ids,
218 timelines,
219 components,
220 } = self;
221
222 *id == other.id
223 && *entity_path == other.entity_path
224 && *is_sorted == other.is_sorted
225 && *row_ids == other.row_ids
226 && *timelines == other.timelines
227 && *components == other.components
228 }
229}
230
231impl Chunk {
232 #[must_use]
238 #[inline]
239 pub fn with_id(mut self, id: ChunkId) -> Self {
240 self.id = id;
241 self
242 }
243
244 pub fn ensure_similar(lhs: &Self, rhs: &Self) -> anyhow::Result<()> {
254 let Self {
255 id: _,
256 entity_path,
257 heap_size_bytes: _,
258 is_sorted: _,
259 row_ids: _,
260 timelines,
261 components,
262 } = lhs;
263
264 anyhow::ensure!(*entity_path == rhs.entity_path);
265
266 anyhow::ensure!(timelines.keys().collect_vec() == rhs.timelines.keys().collect_vec());
267
268 for (timeline, left_time_col) in timelines {
269 let right_time_col = rhs
270 .timelines
271 .get(timeline)
272 .ok_or_else(|| anyhow::format_err!("right is missing timeline {timeline:?}"))?;
273 if timeline == &TimelineName::log_time() {
274 continue; }
276 if timeline == "sim_time" {
277 continue; }
279 anyhow::ensure!(
280 left_time_col == right_time_col,
281 "Timeline differs: {timeline:?}"
282 );
283 }
284
285 if entity_path == &EntityPath::properties() {
287 anyhow::ensure!(components.len() == rhs.components.len());
290
291 let recording_time_descriptor = ComponentDescriptor {
293 archetype: Some("rerun.archetypes.RecordingInfo".into()),
294 component: "RecordingInfo:start_time".into(),
295 component_type: Some("rerun.components.Timestamp".into()),
296 };
297
298 let lhs_components = components
300 .iter()
301 .filter(|&(desc, _list_array)| desc != &recording_time_descriptor)
302 .map(|(desc, list_array)| (desc.clone(), list_array.clone()))
303 .collect::<IntMap<_, _>>();
304 let rhs_components = rhs
305 .components
306 .iter()
307 .filter(|&(desc, _list_array)| desc != &recording_time_descriptor)
308 .map(|(desc, list_array)| (desc.clone(), list_array.clone()))
309 .collect::<IntMap<_, _>>();
310
311 anyhow::ensure!(lhs_components == rhs_components);
312 Ok(())
313 } else {
314 ChunkComponents::ensure_similar(components, &rhs.components)
315 }
316 }
317
318 pub fn are_equal(&self, other: &Self) -> bool {
320 let Self {
321 id,
322 entity_path,
323 heap_size_bytes: _,
324 is_sorted,
325 row_ids,
326 timelines,
327 components,
328 } = self;
329
330 let my_components: IntMap<_, _> = components
331 .iter()
332 .map(|(descr, list_array)| (descr.clone(), list_array))
333 .collect();
334
335 let other_components: IntMap<_, _> = other
336 .components
337 .iter()
338 .map(|(descr, list_array)| (descr.clone(), list_array))
339 .collect();
340
341 *id == other.id
342 && *entity_path == other.entity_path
343 && *is_sorted == other.is_sorted
344 && row_ids == &other.row_ids
345 && *timelines == other.timelines
346 && my_components == other_components
347 }
348}
349
350impl Clone for Chunk {
351 #[inline]
352 fn clone(&self) -> Self {
353 Self {
354 id: self.id,
355 entity_path: self.entity_path.clone(),
356 heap_size_bytes: AtomicU64::new(self.heap_size_bytes.load(Ordering::Relaxed)),
357 is_sorted: self.is_sorted,
358 row_ids: self.row_ids.clone(),
359 timelines: self.timelines.clone(),
360 components: self.components.clone(),
361 }
362 }
363}
364
365impl Chunk {
366 #[inline]
371 pub fn clone_as(&self, id: ChunkId, first_row_id: RowId) -> Self {
372 let row_ids = std::iter::from_fn({
373 let mut row_id = first_row_id;
374 move || {
375 let yielded = row_id;
376 row_id = row_id.next();
377 Some(yielded)
378 }
379 })
380 .take(self.row_ids.len())
381 .collect_vec();
382
383 Self {
384 id,
385 row_ids: RowId::arrow_from_slice(&row_ids),
386 ..self.clone()
387 }
388 }
389
390 #[inline]
392 pub fn into_static(mut self) -> Self {
393 self.timelines.clear();
394 self
395 }
396
397 pub fn zeroed(self) -> Self {
399 let row_ids = vec![RowId::ZERO; self.row_ids.len()];
400
401 let row_ids = RowId::arrow_from_slice(&row_ids);
402
403 Self { row_ids, ..self }
404 }
405
406 #[inline]
415 pub fn time_range_per_component(
416 &self,
417 ) -> IntMap<TimelineName, IntMap<ComponentDescriptor, AbsoluteTimeRange>> {
418 re_tracing::profile_function!();
419
420 self.timelines
421 .iter()
422 .map(|(timeline_name, time_column)| {
423 (
424 *timeline_name,
425 time_column.time_range_per_component(&self.components),
426 )
427 })
428 .collect()
429 }
430
431 #[inline]
437 pub fn num_events_cumulative(&self) -> u64 {
438 self.components
440 .values()
441 .map(|list_array| {
442 list_array.nulls().map_or_else(
443 || list_array.len() as u64,
444 |validity| validity.len() as u64 - validity.null_count() as u64,
445 )
446 })
447 .sum()
448 }
449
450 pub fn num_events_cumulative_per_unique_time(
458 &self,
459 timeline: &TimelineName,
460 ) -> Vec<(TimeInt, u64)> {
461 re_tracing::profile_function!();
462
463 if self.is_static() {
464 return vec![(TimeInt::STATIC, self.num_events_cumulative())];
465 }
466
467 let Some(time_column) = self.timelines().get(timeline) else {
468 return Vec::new();
469 };
470
471 let time_range = time_column.time_range();
472 if time_range.min() == time_range.max() {
473 return vec![(time_range.min(), self.num_events_cumulative())];
474 }
475
476 let counts = if time_column.is_sorted() {
477 self.num_events_cumulative_per_unique_time_sorted(time_column)
478 } else {
479 self.num_events_cumulative_per_unique_time_unsorted(time_column)
480 };
481
482 debug_assert!(
483 counts
484 .iter()
485 .tuple_windows::<(_, _)>()
486 .all(|((time1, _), (time2, _))| time1 < time2)
487 );
488
489 counts
490 }
491
492 fn num_events_cumulative_per_unique_time_sorted(
493 &self,
494 time_column: &TimeColumn,
495 ) -> Vec<(TimeInt, u64)> {
496 re_tracing::profile_function!();
497
498 debug_assert!(time_column.is_sorted());
499
500 let mut counts_raw = vec![0u64; self.num_rows()];
505 {
506 self.components.values().for_each(|list_array| {
507 if let Some(validity) = list_array.nulls() {
508 validity
509 .iter()
510 .enumerate()
511 .for_each(|(i, is_valid)| counts_raw[i] += is_valid as u64);
512 } else {
513 for count in &mut counts_raw {
514 *count += 1;
515 }
516 }
517 });
518 }
519
520 let mut counts = Vec::with_capacity(counts_raw.len());
521
522 let Some(mut cur_time) = time_column.times().next() else {
523 return Vec::new();
524 };
525 let mut cur_count = 0;
526 izip!(time_column.times(), counts_raw).for_each(|(time, count)| {
527 if time == cur_time {
528 cur_count += count;
529 } else {
530 counts.push((cur_time, cur_count));
531 cur_count = count;
532 cur_time = time;
533 }
534 });
535
536 if counts.last().map(|(time, _)| *time) != Some(cur_time) {
537 counts.push((cur_time, cur_count));
538 }
539
540 counts
541 }
542
543 fn num_events_cumulative_per_unique_time_unsorted(
544 &self,
545 time_column: &TimeColumn,
546 ) -> Vec<(TimeInt, u64)> {
547 re_tracing::profile_function!();
548
549 debug_assert!(!time_column.is_sorted());
550
551 let result_unordered =
554 self.components
555 .values()
556 .fold(HashMap::default(), |acc, list_array| {
557 if let Some(validity) = list_array.nulls() {
558 time_column.times().zip(validity.iter()).fold(
559 acc,
560 |mut acc, (time, is_valid)| {
561 *acc.entry(time).or_default() += is_valid as u64;
562 acc
563 },
564 )
565 } else {
566 time_column.times().fold(acc, |mut acc, time| {
567 *acc.entry(time).or_default() += 1;
568 acc
569 })
570 }
571 });
572
573 let mut result = result_unordered.into_iter().collect_vec();
574 result.sort_by_key(|val| val.0);
575 result
576 }
577
578 #[inline]
584 pub fn num_events_for_component(
585 &self,
586 component_descriptor: &ComponentDescriptor,
587 ) -> Option<u64> {
588 self.components.get(component_descriptor).map(|list_array| {
590 list_array.nulls().map_or_else(
591 || list_array.len() as u64,
592 |validity| validity.len() as u64 - validity.null_count() as u64,
593 )
594 })
595 }
596
597 pub fn row_id_range_per_component(&self) -> IntMap<ComponentDescriptor, (RowId, RowId)> {
606 re_tracing::profile_function!();
607
608 let row_ids = self.row_ids().collect_vec();
609
610 if self.is_sorted() {
611 self.components
612 .iter()
613 .filter_map(|(component_desc, list_array)| {
614 let mut row_id_min = None;
615 let mut row_id_max = None;
616
617 for (i, &row_id) in row_ids.iter().enumerate() {
618 if list_array.is_valid(i) {
619 row_id_min = Some(row_id);
620 }
621 }
622 for (i, &row_id) in row_ids.iter().enumerate().rev() {
623 if list_array.is_valid(i) {
624 row_id_max = Some(row_id);
625 }
626 }
627
628 Some((component_desc.clone(), (row_id_min?, row_id_max?)))
629 })
630 .collect()
631 } else {
632 self.components
633 .iter()
634 .filter_map(|(component_desc, list_array)| {
635 let mut row_id_min = Some(RowId::MAX);
636 let mut row_id_max = Some(RowId::ZERO);
637
638 for (i, &row_id) in row_ids.iter().enumerate() {
639 if list_array.is_valid(i) && Some(row_id) > row_id_min {
640 row_id_min = Some(row_id);
641 }
642 }
643 for (i, &row_id) in row_ids.iter().enumerate().rev() {
644 if list_array.is_valid(i) && Some(row_id) < row_id_max {
645 row_id_max = Some(row_id);
646 }
647 }
648
649 Some((component_desc.clone(), (row_id_min?, row_id_max?)))
650 })
651 .collect()
652 }
653 }
654}
655
656#[derive(Debug, Clone, PartialEq, Eq)]
659pub struct TimeColumn {
660 pub(crate) timeline: Timeline,
661
662 pub(crate) times: ArrowScalarBuffer<i64>,
673
674 pub(crate) is_sorted: bool,
679
680 pub(crate) time_range: AbsoluteTimeRange,
684}
685
686#[derive(Debug, thiserror::Error)]
688pub enum TimeColumnError {
689 #[error("Time columns had nulls, but should be dense")]
690 ContainsNulls,
691
692 #[error("Unsupported data type : {0}")]
693 UnsupportedDataType(arrow::datatypes::DataType),
694}
695
696impl Chunk {
697 pub fn new(
707 id: ChunkId,
708 entity_path: EntityPath,
709 is_sorted: Option<bool>,
710 row_ids: FixedSizeBinaryArray,
711 timelines: IntMap<TimelineName, TimeColumn>,
712 components: ChunkComponents,
713 ) -> ChunkResult<Self> {
714 let mut chunk = Self {
715 id,
716 entity_path,
717 heap_size_bytes: AtomicU64::new(0),
718 is_sorted: false,
719 row_ids,
720 timelines,
721 components,
722 };
723
724 chunk.is_sorted = is_sorted.unwrap_or_else(|| chunk.is_sorted_uncached());
725
726 chunk.sanity_check()?;
727
728 Ok(chunk)
729 }
730
731 pub fn from_native_row_ids(
741 id: ChunkId,
742 entity_path: EntityPath,
743 is_sorted: Option<bool>,
744 row_ids: &[RowId],
745 timelines: IntMap<TimelineName, TimeColumn>,
746 components: ChunkComponents,
747 ) -> ChunkResult<Self> {
748 re_tracing::profile_function!();
749 let row_ids = RowId::arrow_from_slice(row_ids);
750 Self::new(id, entity_path, is_sorted, row_ids, timelines, components)
751 }
752
753 pub fn from_auto_row_ids(
761 id: ChunkId,
762 entity_path: EntityPath,
763 timelines: IntMap<TimelineName, TimeColumn>,
764 components: ChunkComponents,
765 ) -> ChunkResult<Self> {
766 let count = components
767 .iter()
768 .next()
769 .map_or(0, |(_, list_array)| list_array.len());
770
771 let row_ids = std::iter::from_fn({
772 let tuid: re_tuid::Tuid = *id;
773 let mut row_id = RowId::from_tuid(tuid.next());
774 move || {
775 let yielded = row_id;
776 row_id = row_id.next();
777 Some(yielded)
778 }
779 })
780 .take(count)
781 .collect_vec();
782
783 Self::from_native_row_ids(id, entity_path, Some(true), &row_ids, timelines, components)
784 }
785
786 #[inline]
790 pub fn new_static(
791 id: ChunkId,
792 entity_path: EntityPath,
793 is_sorted: Option<bool>,
794 row_ids: FixedSizeBinaryArray,
795 components: ChunkComponents,
796 ) -> ChunkResult<Self> {
797 Self::new(
798 id,
799 entity_path,
800 is_sorted,
801 row_ids,
802 Default::default(),
803 components,
804 )
805 }
806
807 #[inline]
808 pub fn empty(id: ChunkId, entity_path: EntityPath) -> Self {
809 Self {
810 id,
811 entity_path,
812 heap_size_bytes: Default::default(),
813 is_sorted: true,
814 row_ids: RowId::arrow_from_slice(&[]),
815 timelines: Default::default(),
816 components: Default::default(),
817 }
818 }
819
820 #[inline]
826 pub fn add_component(
827 &mut self,
828 component_desc: ComponentDescriptor,
829 list_array: ArrowListArray,
830 ) -> ChunkResult<()> {
831 self.components.insert(component_desc, list_array);
832 self.sanity_check()
833 }
834
835 #[inline]
841 pub fn add_timeline(&mut self, chunk_timeline: TimeColumn) -> ChunkResult<()> {
842 self.timelines
843 .insert(*chunk_timeline.timeline.name(), chunk_timeline);
844 self.sanity_check()
845 }
846}
847
848impl TimeColumn {
849 pub fn new(is_sorted: Option<bool>, timeline: Timeline, times: ArrowScalarBuffer<i64>) -> Self {
856 re_tracing::profile_function_if!(1000 < times.len(), format!("{} times", times.len()));
857
858 let time_slice = times.as_ref();
859
860 let is_sorted =
861 is_sorted.unwrap_or_else(|| time_slice.windows(2).all(|times| times[0] <= times[1]));
862
863 let time_range = if is_sorted {
864 let min_time = time_slice
866 .first()
867 .copied()
868 .map_or(TimeInt::MIN, TimeInt::new_temporal);
869 let max_time = time_slice
870 .last()
871 .copied()
872 .map_or(TimeInt::MAX, TimeInt::new_temporal);
873 AbsoluteTimeRange::new(min_time, max_time)
874 } else {
875 let min_time = time_slice
878 .iter()
879 .min()
880 .copied()
881 .map_or(TimeInt::MIN, TimeInt::new_temporal);
882 let max_time = time_slice
883 .iter()
884 .max()
885 .copied()
886 .map_or(TimeInt::MAX, TimeInt::new_temporal);
887 AbsoluteTimeRange::new(min_time, max_time)
888 };
889
890 Self {
891 timeline,
892 times,
893 is_sorted,
894 time_range,
895 }
896 }
897
898 pub fn new_sequence(
900 name: impl Into<re_log_types::TimelineName>,
901 times: impl IntoIterator<Item = impl Into<i64>>,
902 ) -> Self {
903 let time_vec: Vec<_> = times.into_iter().map(|t| {
904 let t = t.into();
905 TimeInt::try_from(t)
906 .unwrap_or_else(|_| {
907 re_log::error!(
908 illegal_value = t,
909 new_value = TimeInt::MIN.as_i64(),
910 "TimeColumn::new_sequence() called with illegal value - clamped to minimum legal value"
911 );
912 TimeInt::MIN
913 })
914 .as_i64()
915 }).collect();
916
917 Self::new(
918 None,
919 Timeline::new_sequence(name.into()),
920 ArrowScalarBuffer::from(time_vec),
921 )
922 }
923
924 pub fn new_duration_secs(
926 name: impl Into<re_log_types::TimelineName>,
927 seconds: impl IntoIterator<Item = impl Into<f64>>,
928 ) -> Self {
929 let time_vec = seconds.into_iter().map(|seconds| {
930 let seconds = seconds.into();
931 let nanos = (1e9 * seconds).round();
932 let clamped = NonMinI64::saturating_from_i64(nanos as i64);
933 if clamped.get() as f64 != nanos {
934 re_log::warn!(
935 illegal_value = nanos,
936 new_value = clamped.get(),
937 "TimeColumn::new_duration_secs() called with out-of-range value. Clamped to valid range."
938 );
939 }
940 clamped.get()
941 }).collect_vec();
942
943 Self::new(
944 None,
945 Timeline::new(name, TimeType::DurationNs),
946 ArrowScalarBuffer::from(time_vec),
947 )
948 }
949
950 pub fn new_timestamp_secs_since_epoch(
952 name: impl Into<re_log_types::TimelineName>,
953 seconds: impl IntoIterator<Item = impl Into<f64>>,
954 ) -> Self {
955 let time_vec = seconds.into_iter().map(|seconds| {
956 let seconds = seconds.into();
957 let nanos = (1e9 * seconds).round();
958 let clamped = NonMinI64::saturating_from_i64(nanos as i64);
959 if clamped.get() as f64 != nanos {
960 re_log::warn!(
961 illegal_value = nanos,
962 new_value = clamped.get(),
963 "TimeColumn::new_timestamp_secs_since_epoch() called with out-of-range value. Clamped to valid range."
964 );
965 }
966 clamped.get()
967 }).collect_vec();
968
969 Self::new(
970 None,
971 Timeline::new(name, TimeType::TimestampNs),
972 ArrowScalarBuffer::from(time_vec),
973 )
974 }
975
976 #[deprecated = "Use `TimeColumn::new_duration_secs` or `new_timestamp_secs_since_epoch` instead"]
978 pub fn new_seconds(
979 name: impl Into<re_log_types::TimelineName>,
980 seconds: impl IntoIterator<Item = impl Into<f64>>,
981 ) -> Self {
982 Self::new_duration_secs(name, seconds)
983 }
984
985 pub fn new_duration_nanos(
987 name: impl Into<re_log_types::TimelineName>,
988 nanos: impl IntoIterator<Item = impl Into<i64>>,
989 ) -> Self {
990 let time_vec = nanos
991 .into_iter()
992 .map(|nanos| {
993 let nanos = nanos.into();
994 NonMinI64::new(nanos)
995 .unwrap_or_else(|| {
996 re_log::error!(
997 illegal_value = nanos,
998 new_value = TimeInt::MIN.as_i64(),
999 "TimeColumn::new_duration_nanos() called with illegal value - clamped to minimum legal value"
1000 );
1001 NonMinI64::MIN
1002 })
1003 .get()
1004 })
1005 .collect_vec();
1006
1007 Self::new(
1008 None,
1009 Timeline::new(name, TimeType::DurationNs),
1010 ArrowScalarBuffer::from(time_vec),
1011 )
1012 }
1013
1014 pub fn new_timestamp_nanos_since_epoch(
1016 name: impl Into<re_log_types::TimelineName>,
1017 nanos: impl IntoIterator<Item = impl Into<i64>>,
1018 ) -> Self {
1019 let time_vec = nanos
1020 .into_iter()
1021 .map(|nanos| {
1022 let nanos = nanos.into();
1023 NonMinI64::new(nanos)
1024 .unwrap_or_else(|| {
1025 re_log::error!(
1026 illegal_value = nanos,
1027 new_value = TimeInt::MIN.as_i64(),
1028 "TimeColumn::new_timestamp_nanos_since_epoch() called with illegal value - clamped to minimum legal value"
1029 );
1030 NonMinI64::MIN
1031 })
1032 .get()
1033 })
1034 .collect_vec();
1035
1036 Self::new(
1037 None,
1038 Timeline::new(name, TimeType::TimestampNs),
1039 ArrowScalarBuffer::from(time_vec),
1040 )
1041 }
1042
1043 #[deprecated = "Use `TimeColumn::new_duration_nanos` or `new_timestamp_nanos_since_epoch` instead"]
1045 pub fn new_nanos(
1046 name: impl Into<re_log_types::TimelineName>,
1047 nanos: impl IntoIterator<Item = impl Into<i64>>,
1048 ) -> Self {
1049 Self::new_duration_nanos(name, nanos)
1050 }
1051
1052 pub fn read_array(array: &dyn ArrowArray) -> Result<ArrowScalarBuffer<i64>, TimeColumnError> {
1056 if array.null_count() > 0 {
1057 Err(TimeColumnError::ContainsNulls)
1058 } else {
1059 Self::read_nullable_array(array).map(|(times, _nulls)| times)
1060 }
1061 }
1062
1063 pub fn read_nullable_array(
1067 array: &dyn ArrowArray,
1068 ) -> Result<(ArrowScalarBuffer<i64>, Option<ArrowNullBuffer>), TimeColumnError> {
1069 if let Some(times) = array.downcast_array_ref::<arrow::array::Int64Array>() {
1071 Ok((times.values().clone(), times.nulls().cloned()))
1072 } else if let Some(times) =
1073 array.downcast_array_ref::<arrow::array::TimestampNanosecondArray>()
1074 {
1075 Ok((times.values().clone(), times.nulls().cloned()))
1076 } else if let Some(times) =
1077 array.downcast_array_ref::<arrow::array::Time64NanosecondArray>()
1078 {
1079 Ok((times.values().clone(), times.nulls().cloned()))
1080 } else if let Some(times) =
1081 array.downcast_array_ref::<arrow::array::DurationNanosecondArray>()
1082 {
1083 Ok((times.values().clone(), times.nulls().cloned()))
1084 } else {
1085 Err(TimeColumnError::UnsupportedDataType(
1086 array.data_type().clone(),
1087 ))
1088 }
1089 }
1090}
1091
1092impl Chunk {
1095 #[inline]
1096 pub fn id(&self) -> ChunkId {
1097 self.id
1098 }
1099
1100 #[inline]
1101 pub fn entity_path(&self) -> &EntityPath {
1102 &self.entity_path
1103 }
1104
1105 #[inline]
1107 pub fn num_columns(&self) -> usize {
1108 let Self {
1109 id: _,
1110 entity_path: _, heap_size_bytes: _,
1112 is_sorted: _,
1113 row_ids: _,
1114 timelines,
1115 components,
1116 } = self;
1117
1118 1 + timelines.len() + components.len()
1119 }
1120
1121 #[inline]
1122 pub fn num_controls(&self) -> usize {
1123 _ = self;
1124 1 }
1126
1127 #[inline]
1128 pub fn num_timelines(&self) -> usize {
1129 self.timelines.len()
1130 }
1131
1132 #[inline]
1133 pub fn num_components(&self) -> usize {
1134 self.components.len()
1135 }
1136
1137 #[inline]
1138 pub fn num_rows(&self) -> usize {
1139 self.row_ids.len()
1140 }
1141
1142 #[inline]
1143 pub fn is_empty(&self) -> bool {
1144 self.num_rows() == 0
1145 }
1146
1147 #[inline]
1148 pub fn row_ids_array(&self) -> &FixedSizeBinaryArray {
1149 &self.row_ids
1150 }
1151
1152 #[inline]
1153 pub fn row_ids_slice(&self) -> &[RowId] {
1154 RowId::slice_from_arrow(&self.row_ids)
1155 }
1156
1157 #[inline]
1161 pub fn row_ids(&self) -> impl ExactSizeIterator<Item = RowId> + '_ {
1162 self.row_ids_slice().iter().copied()
1163 }
1164
1165 #[inline]
1170 pub fn component_row_ids(
1171 &self,
1172 component_descriptor: &ComponentDescriptor,
1173 ) -> impl Iterator<Item = RowId> + '_ + use<'_> {
1174 let Some(list_array) = self.components.get(component_descriptor) else {
1175 return Either::Left(std::iter::empty());
1176 };
1177
1178 let row_ids = self.row_ids();
1179
1180 if let Some(validity) = list_array.nulls() {
1181 Either::Right(Either::Left(
1182 row_ids
1183 .enumerate()
1184 .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
1185 ))
1186 } else {
1187 Either::Right(Either::Right(row_ids))
1188 }
1189 }
1190
1191 #[inline]
1197 pub fn row_id_range(&self) -> Option<(RowId, RowId)> {
1198 if self.is_empty() {
1199 return None;
1200 }
1201
1202 let row_ids = self.row_ids_slice();
1203
1204 #[expect(clippy::unwrap_used)] Some(if self.is_sorted() {
1206 (
1207 row_ids.first().copied().unwrap(),
1208 row_ids.last().copied().unwrap(),
1209 )
1210 } else {
1211 (
1212 row_ids.iter().min().copied().unwrap(),
1213 row_ids.iter().max().copied().unwrap(),
1214 )
1215 })
1216 }
1217
1218 #[inline]
1219 pub fn is_static(&self) -> bool {
1220 self.timelines.is_empty()
1221 }
1222
1223 #[inline]
1224 pub fn timelines(&self) -> &IntMap<TimelineName, TimeColumn> {
1225 &self.timelines
1226 }
1227
1228 #[inline]
1229 pub fn component_descriptors(&self) -> impl Iterator<Item = ComponentDescriptor> + '_ {
1230 self.components.keys().cloned()
1231 }
1232
1233 #[inline]
1234 pub fn components(&self) -> &ChunkComponents {
1235 &self.components
1236 }
1237}
1238
1239impl std::fmt::Display for Chunk {
1240 #[inline]
1241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1242 let batch = self.to_record_batch().map_err(|err| {
1243 re_log::error_once!("couldn't display Chunk: {err}");
1244 std::fmt::Error
1245 })?;
1246 re_format_arrow::format_record_batch_with_width(&batch, f.width(), f.sign_minus()).fmt(f)
1247 }
1248}
1249
1250impl TimeColumn {
1251 #[inline]
1252 pub fn timeline(&self) -> &Timeline {
1253 &self.timeline
1254 }
1255
1256 #[inline]
1257 pub fn name(&self) -> &str {
1258 self.timeline.name()
1259 }
1260
1261 #[inline]
1262 pub fn time_range(&self) -> AbsoluteTimeRange {
1263 self.time_range
1264 }
1265
1266 #[inline]
1267 pub fn times_buffer(&self) -> &ArrowScalarBuffer<i64> {
1268 &self.times
1269 }
1270
1271 #[inline]
1273 pub fn times_array(&self) -> ArrowArrayRef {
1274 self.timeline.typ().make_arrow_array(self.times.clone())
1275 }
1276
1277 #[inline]
1280 pub fn times_raw(&self) -> &[i64] {
1281 self.times.as_ref()
1282 }
1283
1284 #[inline]
1287 pub fn times_nonmin(&self) -> impl DoubleEndedIterator<Item = NonMinI64> + '_ {
1288 self.times_raw()
1289 .iter()
1290 .copied()
1291 .map(NonMinI64::saturating_from_i64)
1292 }
1293
1294 #[inline]
1295 pub fn times(&self) -> impl DoubleEndedIterator<Item = TimeInt> + '_ {
1296 self.times_raw().iter().copied().map(TimeInt::new_temporal)
1297 }
1298
1299 #[inline]
1300 pub fn num_rows(&self) -> usize {
1301 self.times.len()
1302 }
1303
1304 #[inline]
1305 pub fn is_empty(&self) -> bool {
1306 self.num_rows() == 0
1307 }
1308
1309 pub fn time_range_per_component(
1318 &self,
1319 components: &ChunkComponents,
1320 ) -> IntMap<ComponentDescriptor, AbsoluteTimeRange> {
1321 let times = self.times_raw();
1322 components
1323 .iter()
1324 .filter_map(|(component_desc, list_array)| {
1325 if let Some(validity) = list_array.nulls() {
1326 if validity.is_empty() {
1329 return None;
1330 }
1331
1332 let is_dense = validity.null_count() == 0;
1333 if is_dense {
1334 return Some((component_desc.clone(), self.time_range));
1335 }
1336
1337 let mut time_min = TimeInt::MAX;
1338 for (i, time) in times.iter().copied().enumerate() {
1339 if validity.is_valid(i) {
1340 time_min = TimeInt::new_temporal(time);
1341 break;
1342 }
1343 }
1344
1345 let mut time_max = TimeInt::MIN;
1346 for (i, time) in times.iter().copied().enumerate().rev() {
1347 if validity.is_valid(i) {
1348 time_max = TimeInt::new_temporal(time);
1349 break;
1350 }
1351 }
1352
1353 Some((
1354 component_desc.clone(),
1355 AbsoluteTimeRange::new(time_min, time_max),
1356 ))
1357 } else {
1358 Some((component_desc.clone(), self.time_range))
1361 }
1362 })
1363 .collect()
1364 }
1365}
1366
1367impl re_byte_size::SizeBytes for Chunk {
1368 #[inline]
1369 fn heap_size_bytes(&self) -> u64 {
1370 let Self {
1371 id,
1372 entity_path,
1373 heap_size_bytes,
1374 is_sorted,
1375 row_ids,
1376 timelines,
1377 components,
1378 } = self;
1379
1380 let mut size_bytes = heap_size_bytes.load(Ordering::Relaxed);
1381
1382 if size_bytes == 0 {
1383 size_bytes = id.heap_size_bytes()
1384 + entity_path.heap_size_bytes()
1385 + is_sorted.heap_size_bytes()
1386 + row_ids.heap_size_bytes()
1387 + timelines.heap_size_bytes()
1388 + components.heap_size_bytes();
1389 heap_size_bytes.store(size_bytes, Ordering::Relaxed);
1390 }
1391
1392 size_bytes
1393 }
1394}
1395
1396impl re_byte_size::SizeBytes for TimeColumn {
1397 #[inline]
1398 fn heap_size_bytes(&self) -> u64 {
1399 let Self {
1400 timeline,
1401 times,
1402 is_sorted,
1403 time_range,
1404 } = self;
1405
1406 timeline.heap_size_bytes()
1407 + times.heap_size_bytes() + is_sorted.heap_size_bytes()
1409 + time_range.heap_size_bytes()
1410 }
1411}
1412
1413impl Chunk {
1416 #[track_caller]
1420 pub fn sanity_check(&self) -> ChunkResult<()> {
1421 re_tracing::profile_function!();
1422
1423 let Self {
1424 id: _,
1425 entity_path: _,
1426 heap_size_bytes,
1427 is_sorted,
1428 row_ids,
1429 timelines,
1430 components,
1431 } = self;
1432
1433 if cfg!(debug_assertions) {
1434 let measured = self.heap_size_bytes();
1435 let advertised = heap_size_bytes.load(Ordering::Relaxed);
1436 if advertised != measured {
1437 return Err(ChunkError::Malformed {
1438 reason: format!(
1439 "Chunk advertises a heap size of {} but we measure {} instead",
1440 re_format::format_bytes(advertised as _),
1441 re_format::format_bytes(measured as _),
1442 ),
1443 });
1444 }
1445 }
1446
1447 {
1449 if *row_ids.data_type() != RowId::arrow_datatype() {
1450 return Err(ChunkError::Malformed {
1451 reason: format!(
1452 "RowId data has the wrong datatype: expected {:?} but got {:?} instead",
1453 RowId::arrow_datatype(),
1454 *row_ids.data_type(),
1455 ),
1456 });
1457 }
1458
1459 #[expect(clippy::collapsible_if)] if cfg!(debug_assertions) {
1461 if *is_sorted != self.is_sorted_uncached() {
1462 return Err(ChunkError::Malformed {
1463 reason: format!(
1464 "Chunk is marked as {}sorted but isn't: {row_ids:?}",
1465 if *is_sorted { "" } else { "un" },
1466 ),
1467 });
1468 }
1469 }
1470 }
1471
1472 for (timeline_name, time_column) in timelines {
1474 if time_column.times.len() != row_ids.len() {
1475 return Err(ChunkError::Malformed {
1476 reason: format!(
1477 "All timelines in a chunk must have the same number of timestamps, matching the number of row IDs. \
1478 Found {} row IDs but {} timestamps for timeline '{timeline_name}'",
1479 row_ids.len(),
1480 time_column.times.len(),
1481 ),
1482 });
1483 }
1484
1485 time_column.sanity_check()?;
1486 }
1487
1488 for (component_desc, list_array) in components.iter() {
1491 if let Some(c) = component_desc.component_type {
1492 c.sanity_check();
1493 }
1494 if let arrow::datatypes::DataType::List(_field) = list_array.data_type() {
1496 } else {
1499 return Err(ChunkError::Malformed {
1500 reason: format!(
1501 "The inner array in a chunked component batch must be a list, got {:?}",
1502 list_array.data_type(),
1503 ),
1504 });
1505 }
1506
1507 if list_array.len() != row_ids.len() {
1508 return Err(ChunkError::Malformed {
1509 reason: format!(
1510 "All component batches in a chunk must have the same number of rows, matching the number of row IDs. \
1511 Found {} row IDs but {} rows for component batch {component_desc}",
1512 row_ids.len(),
1513 list_array.len(),
1514 ),
1515 });
1516 }
1517
1518 let validity_is_empty = list_array
1519 .nulls()
1520 .is_some_and(|validity| validity.is_empty());
1521 if !self.is_empty() && validity_is_empty {
1522 return Err(ChunkError::Malformed {
1523 reason: format!(
1524 "All component batches in a chunk must contain at least one non-null entry.\
1525 Found a completely empty column for {component_desc}",
1526 ),
1527 });
1528 }
1529 }
1530
1531 Ok(())
1532 }
1533}
1534
1535impl TimeColumn {
1536 #[track_caller]
1540 pub fn sanity_check(&self) -> ChunkResult<()> {
1541 let Self {
1542 timeline: _,
1543 times,
1544 is_sorted,
1545 time_range,
1546 } = self;
1547
1548 let times = times.as_ref();
1549
1550 if cfg!(debug_assertions)
1551 && *is_sorted != times.windows(2).all(|times| times[0] <= times[1])
1552 {
1553 return Err(ChunkError::Malformed {
1554 reason: format!(
1555 "Time column is marked as {}sorted but isn't: {times:?}",
1556 if *is_sorted { "" } else { "un" },
1557 ),
1558 });
1559 }
1560
1561 if cfg!(debug_assertions) {
1562 let is_tight_lower_bound = times.iter().any(|&time| time == time_range.min().as_i64());
1563 let is_tight_upper_bound = times.iter().any(|&time| time == time_range.max().as_i64());
1564 let is_tight_bound = is_tight_lower_bound && is_tight_upper_bound;
1565
1566 if !self.is_empty() && !is_tight_bound {
1567 return Err(ChunkError::Malformed {
1568 reason: "Time column's cached time range isn't a tight bound.".to_owned(),
1569 });
1570 }
1571
1572 for &time in times {
1573 if time < time_range.min().as_i64() || time > time_range.max().as_i64() {
1574 return Err(ChunkError::Malformed {
1575 reason: format!(
1576 "Time column's cached time range is wrong.\
1577 Found a time value of {time} while its time range is {time_range:?}",
1578 ),
1579 });
1580 }
1581
1582 if time == TimeInt::STATIC.as_i64() {
1583 return Err(ChunkError::Malformed {
1584 reason: "A chunk's timeline should never contain a static time value."
1585 .to_owned(),
1586 });
1587 }
1588 }
1589 }
1590
1591 Ok(())
1592 }
1593}