1use std::sync::atomic::{AtomicU64, Ordering};
2
3use ahash::HashMap;
4use anyhow::Context as _;
5use arrow::{
6 array::{
7 Array as ArrowArray, ArrayRef as ArrowArrayRef, FixedSizeBinaryArray,
8 ListArray as ArrowListArray,
9 },
10 buffer::{NullBuffer as ArrowNullBuffer, ScalarBuffer as ArrowScalarBuffer},
11};
12use itertools::{Either, Itertools as _, izip};
13use nohash_hasher::IntMap;
14
15use re_arrow_util::ArrowArrayDowncastRef as _;
16use re_byte_size::SizeBytes as _;
17use re_log_types::{
18 EntityPath, NonMinI64, ResolvedTimeRange, TimeInt, TimeType, Timeline, TimelineName,
19};
20use re_types_core::{
21 ArchetypeName, ComponentDescriptor, ComponentType, DeserializationError, Loggable as _,
22 SerializationError, SerializedComponentColumn,
23};
24
25use crate::{ChunkId, RowId};
26
27#[derive(thiserror::Error, Debug)]
32pub enum ChunkError {
33 #[error("Detected malformed Chunk: {reason}")]
34 Malformed { reason: String },
35
36 #[error("Arrow: {0}")]
37 Arrow(#[from] arrow::error::ArrowError),
38
39 #[error("{kind} index out of bounds: {index} (len={len})")]
40 IndexOutOfBounds {
41 kind: String,
42 len: usize,
43 index: usize,
44 },
45
46 #[error("Serialization: {0}")]
47 Serialization(#[from] SerializationError),
48
49 #[error("Deserialization: {0}")]
50 Deserialization(#[from] DeserializationError),
51
52 #[error(transparent)]
53 UnsupportedTimeType(#[from] re_sorbet::UnsupportedTimeType),
54
55 #[error(transparent)]
56 WrongDatatypeError(#[from] re_sorbet::WrongDatatypeError),
57
58 #[error(transparent)]
59 MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError),
60
61 #[error(transparent)]
62 InvalidSorbetSchema(#[from] re_sorbet::SorbetError),
63}
64
65pub type ChunkResult<T> = Result<T, ChunkError>;
66
67#[derive(Debug, Clone, Default, PartialEq)]
70pub struct ChunkComponents(pub IntMap<ComponentDescriptor, ArrowListArray>);
71
72impl ChunkComponents {
73 #[inline]
77 pub fn get_by_component_type(
78 &self,
79 component_type: ComponentType,
80 ) -> impl Iterator<Item = &ArrowListArray> {
81 self.0.iter().filter_map(move |(desc, array)| {
82 (desc.component_type == Some(component_type)).then_some(array)
83 })
84 }
85
86 pub fn ensure_similar(left: &Self, right: &Self) -> anyhow::Result<()> {
94 anyhow::ensure!(left.len() == right.len());
95 for (descr, left_array) in left.iter() {
96 let Some(right_array) = right.get(descr) else {
97 anyhow::bail!("rhs is missing {descr:?}");
98 };
99 re_arrow_util::ensure_similar(&left_array.to_data(), &right_array.to_data())
100 .with_context(|| format!("Component {descr:?}"))?;
101 }
102 Ok(())
103 }
104
105 pub fn contains_component(&self, component_descr: &ComponentDescriptor) -> bool {
107 self.0.contains_key(component_descr)
108 }
109
110 pub fn has_component_with_archetype(&self, archetype_name: ArchetypeName) -> bool {
112 self.0
113 .keys()
114 .any(|desc| desc.archetype == Some(archetype_name))
115 }
116}
117
118impl std::ops::Deref for ChunkComponents {
119 type Target = IntMap<ComponentDescriptor, ArrowListArray>;
120
121 #[inline]
122 fn deref(&self) -> &Self::Target {
123 &self.0
124 }
125}
126
127impl std::ops::DerefMut for ChunkComponents {
128 #[inline]
129 fn deref_mut(&mut self) -> &mut Self::Target {
130 &mut self.0
131 }
132}
133
134impl FromIterator<(ComponentDescriptor, ArrowListArray)> for ChunkComponents {
135 #[inline]
136 fn from_iter<T: IntoIterator<Item = (ComponentDescriptor, ArrowListArray)>>(iter: T) -> Self {
137 let mut this = Self::default();
138 {
139 for (component_desc, list_array) in iter {
140 this.insert(component_desc, list_array);
141 }
142 }
143 this
144 }
145}
146
147impl FromIterator<SerializedComponentColumn> for ChunkComponents {
148 #[inline]
149 fn from_iter<T: IntoIterator<Item = SerializedComponentColumn>>(iter: T) -> Self {
150 let mut this = Self::default();
151 {
152 for serialized in iter {
153 this.insert(serialized.descriptor, serialized.list_array);
154 }
155 }
156 this
157 }
158}
159
160#[derive(Debug)]
170pub struct Chunk {
171 pub(crate) id: ChunkId,
172
173 pub(crate) entity_path: EntityPath,
174
175 pub(crate) heap_size_bytes: AtomicU64,
180
181 pub(crate) is_sorted: bool,
183
184 pub(crate) row_ids: FixedSizeBinaryArray,
186
187 pub(crate) timelines: IntMap<TimelineName, TimeColumn>,
193
194 pub(crate) components: ChunkComponents,
200}
201
202impl PartialEq for Chunk {
203 #[inline]
204 fn eq(&self, other: &Self) -> bool {
205 let Self {
206 id,
207 entity_path,
208 heap_size_bytes: _,
209 is_sorted,
210 row_ids,
211 timelines,
212 components,
213 } = self;
214
215 *id == other.id
216 && *entity_path == other.entity_path
217 && *is_sorted == other.is_sorted
218 && *row_ids == other.row_ids
219 && *timelines == other.timelines
220 && *components == other.components
221 }
222}
223
224impl Chunk {
225 #[must_use]
231 #[inline]
232 pub fn with_id(mut self, id: ChunkId) -> Self {
233 self.id = id;
234 self
235 }
236
237 pub fn ensure_similar(lhs: &Self, rhs: &Self) -> anyhow::Result<()> {
247 let Self {
248 id: _,
249 entity_path,
250 heap_size_bytes: _,
251 is_sorted: _,
252 row_ids: _,
253 timelines,
254 components,
255 } = lhs;
256
257 anyhow::ensure!(*entity_path == rhs.entity_path);
258
259 anyhow::ensure!(timelines.keys().collect_vec() == rhs.timelines.keys().collect_vec());
260
261 for (timeline, left_time_col) in timelines {
262 let right_time_col = rhs
263 .timelines
264 .get(timeline)
265 .ok_or_else(|| anyhow::format_err!("right is missing timeline {timeline:?}"))?;
266 if timeline == &TimelineName::log_time() {
267 continue; }
269 if timeline == "sim_time" {
270 continue; }
272 anyhow::ensure!(
273 left_time_col == right_time_col,
274 "Timeline differs: {timeline:?}"
275 );
276 }
277
278 if entity_path == &EntityPath::properties() {
280 anyhow::ensure!(components.len() == rhs.components.len());
283
284 let recording_time_descriptor = ComponentDescriptor {
286 archetype: Some("rerun.archetypes.RecordingInfo".into()),
287 component: "RecordingInfo:start_time".into(),
288 component_type: Some("rerun.components.Timestamp".into()),
289 };
290
291 let lhs_components = components
293 .iter()
294 .filter(|&(desc, _list_array)| (desc != &recording_time_descriptor))
295 .map(|(desc, list_array)| (desc.clone(), list_array.clone()))
296 .collect::<IntMap<_, _>>();
297 let rhs_components = rhs
298 .components
299 .iter()
300 .filter(|&(desc, _list_array)| (desc != &recording_time_descriptor))
301 .map(|(desc, list_array)| (desc.clone(), list_array.clone()))
302 .collect::<IntMap<_, _>>();
303
304 anyhow::ensure!(lhs_components == rhs_components);
305 Ok(())
306 } else {
307 ChunkComponents::ensure_similar(components, &rhs.components)
308 }
309 }
310
311 pub fn are_equal(&self, other: &Self) -> bool {
313 let Self {
314 id,
315 entity_path,
316 heap_size_bytes: _,
317 is_sorted,
318 row_ids,
319 timelines,
320 components,
321 } = self;
322
323 let my_components: IntMap<_, _> = components
324 .iter()
325 .map(|(descr, list_array)| (descr.clone(), list_array))
326 .collect();
327
328 let other_components: IntMap<_, _> = other
329 .components
330 .iter()
331 .map(|(descr, list_array)| (descr.clone(), list_array))
332 .collect();
333
334 *id == other.id
335 && *entity_path == other.entity_path
336 && *is_sorted == other.is_sorted
337 && row_ids == &other.row_ids
338 && *timelines == other.timelines
339 && my_components == other_components
340 }
341}
342
343impl Clone for Chunk {
344 #[inline]
345 fn clone(&self) -> Self {
346 Self {
347 id: self.id,
348 entity_path: self.entity_path.clone(),
349 heap_size_bytes: AtomicU64::new(self.heap_size_bytes.load(Ordering::Relaxed)),
350 is_sorted: self.is_sorted,
351 row_ids: self.row_ids.clone(),
352 timelines: self.timelines.clone(),
353 components: self.components.clone(),
354 }
355 }
356}
357
358impl Chunk {
359 #[inline]
364 pub fn clone_as(&self, id: ChunkId, first_row_id: RowId) -> Self {
365 let row_ids = std::iter::from_fn({
366 let mut row_id = first_row_id;
367 move || {
368 let yielded = row_id;
369 row_id = row_id.next();
370 Some(yielded)
371 }
372 })
373 .take(self.row_ids.len())
374 .collect_vec();
375
376 Self {
377 id,
378 row_ids: RowId::arrow_from_slice(&row_ids),
379 ..self.clone()
380 }
381 }
382
383 #[inline]
385 pub fn into_static(mut self) -> Self {
386 self.timelines.clear();
387 self
388 }
389
390 pub fn zeroed(self) -> Self {
392 let row_ids = std::iter::repeat(RowId::ZERO)
393 .take(self.row_ids.len())
394 .collect_vec();
395
396 let row_ids = RowId::arrow_from_slice(&row_ids);
397
398 Self { row_ids, ..self }
399 }
400
401 #[inline]
410 pub fn time_range_per_component(
411 &self,
412 ) -> IntMap<TimelineName, IntMap<ComponentDescriptor, ResolvedTimeRange>> {
413 re_tracing::profile_function!();
414
415 self.timelines
416 .iter()
417 .map(|(timeline_name, time_column)| {
418 (
419 *timeline_name,
420 time_column.time_range_per_component(&self.components),
421 )
422 })
423 .collect()
424 }
425
426 #[inline]
432 pub fn num_events_cumulative(&self) -> u64 {
433 self.components
435 .values()
436 .map(|list_array| {
437 list_array.nulls().map_or_else(
438 || list_array.len() as u64,
439 |validity| validity.len() as u64 - validity.null_count() as u64,
440 )
441 })
442 .sum()
443 }
444
445 pub fn num_events_cumulative_per_unique_time(
453 &self,
454 timeline: &TimelineName,
455 ) -> Vec<(TimeInt, u64)> {
456 re_tracing::profile_function!();
457
458 if self.is_static() {
459 return vec![(TimeInt::STATIC, self.num_events_cumulative())];
460 }
461
462 let Some(time_column) = self.timelines().get(timeline) else {
463 return Vec::new();
464 };
465
466 let time_range = time_column.time_range();
467 if time_range.min() == time_range.max() {
468 return vec![(time_range.min(), self.num_events_cumulative())];
469 }
470
471 let counts = if time_column.is_sorted() {
472 self.num_events_cumulative_per_unique_time_sorted(time_column)
473 } else {
474 self.num_events_cumulative_per_unique_time_unsorted(time_column)
475 };
476
477 debug_assert!(
478 counts
479 .iter()
480 .tuple_windows::<(_, _)>()
481 .all(|((time1, _), (time2, _))| time1 < time2)
482 );
483
484 counts
485 }
486
487 fn num_events_cumulative_per_unique_time_sorted(
488 &self,
489 time_column: &TimeColumn,
490 ) -> Vec<(TimeInt, u64)> {
491 re_tracing::profile_function!();
492
493 debug_assert!(time_column.is_sorted());
494
495 let mut counts_raw = vec![0u64; self.num_rows()];
500 {
501 self.components.values().for_each(|list_array| {
502 if let Some(validity) = list_array.nulls() {
503 validity
504 .iter()
505 .enumerate()
506 .for_each(|(i, is_valid)| counts_raw[i] += is_valid as u64);
507 } else {
508 counts_raw.iter_mut().for_each(|count| *count += 1);
509 }
510 });
511 }
512
513 let mut counts = Vec::with_capacity(counts_raw.len());
514
515 let Some(mut cur_time) = time_column.times().next() else {
516 return Vec::new();
517 };
518 let mut cur_count = 0;
519 izip!(time_column.times(), counts_raw).for_each(|(time, count)| {
520 if time == cur_time {
521 cur_count += count;
522 } else {
523 counts.push((cur_time, cur_count));
524 cur_count = count;
525 cur_time = time;
526 }
527 });
528
529 if counts.last().map(|(time, _)| *time) != Some(cur_time) {
530 counts.push((cur_time, cur_count));
531 }
532
533 counts
534 }
535
536 fn num_events_cumulative_per_unique_time_unsorted(
537 &self,
538 time_column: &TimeColumn,
539 ) -> Vec<(TimeInt, u64)> {
540 re_tracing::profile_function!();
541
542 debug_assert!(!time_column.is_sorted());
543
544 let result_unordered =
547 self.components
548 .values()
549 .fold(HashMap::default(), |acc, list_array| {
550 if let Some(validity) = list_array.nulls() {
551 time_column.times().zip(validity.iter()).fold(
552 acc,
553 |mut acc, (time, is_valid)| {
554 *acc.entry(time).or_default() += is_valid as u64;
555 acc
556 },
557 )
558 } else {
559 time_column.times().fold(acc, |mut acc, time| {
560 *acc.entry(time).or_default() += 1;
561 acc
562 })
563 }
564 });
565
566 let mut result = result_unordered.into_iter().collect_vec();
567 result.sort_by_key(|val| val.0);
568 result
569 }
570
571 #[inline]
577 pub fn num_events_for_component(
578 &self,
579 component_descriptor: &ComponentDescriptor,
580 ) -> Option<u64> {
581 self.components.get(component_descriptor).map(|list_array| {
583 list_array.nulls().map_or_else(
584 || list_array.len() as u64,
585 |validity| validity.len() as u64 - validity.null_count() as u64,
586 )
587 })
588 }
589
590 pub fn row_id_range_per_component(&self) -> IntMap<ComponentDescriptor, (RowId, RowId)> {
599 re_tracing::profile_function!();
600
601 let row_ids = self.row_ids().collect_vec();
602
603 if self.is_sorted() {
604 self.components
605 .iter()
606 .filter_map(|(component_desc, list_array)| {
607 let mut row_id_min = None;
608 let mut row_id_max = None;
609
610 for (i, &row_id) in row_ids.iter().enumerate() {
611 if list_array.is_valid(i) {
612 row_id_min = Some(row_id);
613 }
614 }
615 for (i, &row_id) in row_ids.iter().enumerate().rev() {
616 if list_array.is_valid(i) {
617 row_id_max = Some(row_id);
618 }
619 }
620
621 Some((component_desc.clone(), (row_id_min?, row_id_max?)))
622 })
623 .collect()
624 } else {
625 self.components
626 .iter()
627 .filter_map(|(component_desc, list_array)| {
628 let mut row_id_min = Some(RowId::MAX);
629 let mut row_id_max = Some(RowId::ZERO);
630
631 for (i, &row_id) in row_ids.iter().enumerate() {
632 if list_array.is_valid(i) && Some(row_id) > row_id_min {
633 row_id_min = Some(row_id);
634 }
635 }
636 for (i, &row_id) in row_ids.iter().enumerate().rev() {
637 if list_array.is_valid(i) && Some(row_id) < row_id_max {
638 row_id_max = Some(row_id);
639 }
640 }
641
642 Some((component_desc.clone(), (row_id_min?, row_id_max?)))
643 })
644 .collect()
645 }
646 }
647}
648
649#[derive(Debug, Clone, PartialEq, Eq)]
652pub struct TimeColumn {
653 pub(crate) timeline: Timeline,
654
655 pub(crate) times: ArrowScalarBuffer<i64>,
666
667 pub(crate) is_sorted: bool,
672
673 pub(crate) time_range: ResolvedTimeRange,
677}
678
679#[derive(Debug, thiserror::Error)]
681pub enum TimeColumnError {
682 #[error("Time columns had nulls, but should be dense")]
683 ContainsNulls,
684
685 #[error("Unsupported data type : {0}")]
686 UnsupportedDataType(arrow::datatypes::DataType),
687}
688
689impl Chunk {
690 pub fn new(
700 id: ChunkId,
701 entity_path: EntityPath,
702 is_sorted: Option<bool>,
703 row_ids: FixedSizeBinaryArray,
704 timelines: IntMap<TimelineName, TimeColumn>,
705 components: ChunkComponents,
706 ) -> ChunkResult<Self> {
707 let mut chunk = Self {
708 id,
709 entity_path,
710 heap_size_bytes: AtomicU64::new(0),
711 is_sorted: false,
712 row_ids,
713 timelines,
714 components,
715 };
716
717 chunk.is_sorted = is_sorted.unwrap_or_else(|| chunk.is_sorted_uncached());
718
719 chunk.sanity_check()?;
720
721 Ok(chunk)
722 }
723
724 pub fn from_native_row_ids(
734 id: ChunkId,
735 entity_path: EntityPath,
736 is_sorted: Option<bool>,
737 row_ids: &[RowId],
738 timelines: IntMap<TimelineName, TimeColumn>,
739 components: ChunkComponents,
740 ) -> ChunkResult<Self> {
741 re_tracing::profile_function!();
742 let row_ids = RowId::arrow_from_slice(row_ids);
743 Self::new(id, entity_path, is_sorted, row_ids, timelines, components)
744 }
745
746 pub fn from_auto_row_ids(
754 id: ChunkId,
755 entity_path: EntityPath,
756 timelines: IntMap<TimelineName, TimeColumn>,
757 components: ChunkComponents,
758 ) -> ChunkResult<Self> {
759 let count = components
760 .iter()
761 .next()
762 .map_or(0, |(_, list_array)| list_array.len());
763
764 let row_ids = std::iter::from_fn({
765 let tuid: re_tuid::Tuid = *id;
766 let mut row_id = RowId::from_tuid(tuid.next());
767 move || {
768 let yielded = row_id;
769 row_id = row_id.next();
770 Some(yielded)
771 }
772 })
773 .take(count)
774 .collect_vec();
775
776 Self::from_native_row_ids(id, entity_path, Some(true), &row_ids, timelines, components)
777 }
778
779 #[inline]
783 pub fn new_static(
784 id: ChunkId,
785 entity_path: EntityPath,
786 is_sorted: Option<bool>,
787 row_ids: FixedSizeBinaryArray,
788 components: ChunkComponents,
789 ) -> ChunkResult<Self> {
790 Self::new(
791 id,
792 entity_path,
793 is_sorted,
794 row_ids,
795 Default::default(),
796 components,
797 )
798 }
799
800 #[inline]
801 pub fn empty(id: ChunkId, entity_path: EntityPath) -> Self {
802 Self {
803 id,
804 entity_path,
805 heap_size_bytes: Default::default(),
806 is_sorted: true,
807 row_ids: RowId::arrow_from_slice(&[]),
808 timelines: Default::default(),
809 components: Default::default(),
810 }
811 }
812
813 #[inline]
819 pub fn add_component(
820 &mut self,
821 component_desc: ComponentDescriptor,
822 list_array: ArrowListArray,
823 ) -> ChunkResult<()> {
824 self.components.insert(component_desc, list_array);
825 self.sanity_check()
826 }
827
828 #[inline]
834 pub fn add_timeline(&mut self, chunk_timeline: TimeColumn) -> ChunkResult<()> {
835 self.timelines
836 .insert(*chunk_timeline.timeline.name(), chunk_timeline);
837 self.sanity_check()
838 }
839}
840
841impl TimeColumn {
842 pub fn new(is_sorted: Option<bool>, timeline: Timeline, times: ArrowScalarBuffer<i64>) -> Self {
849 re_tracing::profile_function_if!(1000 < times.len(), format!("{} times", times.len()));
850
851 let time_slice = times.as_ref();
852
853 let is_sorted =
854 is_sorted.unwrap_or_else(|| time_slice.windows(2).all(|times| times[0] <= times[1]));
855
856 let time_range = if is_sorted {
857 let min_time = time_slice
859 .first()
860 .copied()
861 .map_or(TimeInt::MIN, TimeInt::new_temporal);
862 let max_time = time_slice
863 .last()
864 .copied()
865 .map_or(TimeInt::MAX, TimeInt::new_temporal);
866 ResolvedTimeRange::new(min_time, max_time)
867 } else {
868 let min_time = time_slice
871 .iter()
872 .min()
873 .copied()
874 .map_or(TimeInt::MIN, TimeInt::new_temporal);
875 let max_time = time_slice
876 .iter()
877 .max()
878 .copied()
879 .map_or(TimeInt::MAX, TimeInt::new_temporal);
880 ResolvedTimeRange::new(min_time, max_time)
881 };
882
883 Self {
884 timeline,
885 times,
886 is_sorted,
887 time_range,
888 }
889 }
890
891 pub fn new_sequence(
893 name: impl Into<re_log_types::TimelineName>,
894 times: impl IntoIterator<Item = impl Into<i64>>,
895 ) -> Self {
896 let time_vec: Vec<_> = times.into_iter().map(|t| {
897 let t = t.into();
898 TimeInt::try_from(t)
899 .unwrap_or_else(|_| {
900 re_log::error!(
901 illegal_value = t,
902 new_value = TimeInt::MIN.as_i64(),
903 "TimeColumn::new_sequence() called with illegal value - clamped to minimum legal value"
904 );
905 TimeInt::MIN
906 })
907 .as_i64()
908 }).collect();
909
910 Self::new(
911 None,
912 Timeline::new_sequence(name.into()),
913 ArrowScalarBuffer::from(time_vec),
914 )
915 }
916
917 pub fn new_duration_secs(
919 name: impl Into<re_log_types::TimelineName>,
920 seconds: impl IntoIterator<Item = impl Into<f64>>,
921 ) -> Self {
922 let time_vec = seconds.into_iter().map(|seconds| {
923 let seconds = seconds.into();
924 let nanos = (1e9 * seconds).round();
925 let clamped = NonMinI64::saturating_from_i64(nanos as i64);
926 if clamped.get() as f64 != nanos {
927 re_log::warn!(
928 illegal_value = nanos,
929 new_value = clamped.get(),
930 "TimeColumn::new_duration_secs() called with out-of-range value. Clamped to valid range."
931 );
932 }
933 clamped.get()
934 }).collect_vec();
935
936 Self::new(
937 None,
938 Timeline::new(name, TimeType::DurationNs),
939 ArrowScalarBuffer::from(time_vec),
940 )
941 }
942
943 pub fn new_timestamp_secs_since_epoch(
945 name: impl Into<re_log_types::TimelineName>,
946 seconds: impl IntoIterator<Item = impl Into<f64>>,
947 ) -> Self {
948 let time_vec = seconds.into_iter().map(|seconds| {
949 let seconds = seconds.into();
950 let nanos = (1e9 * seconds).round();
951 let clamped = NonMinI64::saturating_from_i64(nanos as i64);
952 if clamped.get() as f64 != nanos {
953 re_log::warn!(
954 illegal_value = nanos,
955 new_value = clamped.get(),
956 "TimeColumn::new_timestamp_secs_since_epoch() called with out-of-range value. Clamped to valid range."
957 );
958 }
959 clamped.get()
960 }).collect_vec();
961
962 Self::new(
963 None,
964 Timeline::new(name, TimeType::TimestampNs),
965 ArrowScalarBuffer::from(time_vec),
966 )
967 }
968
969 #[deprecated = "Use `TimeColumn::new_duration_secs` or `new_timestamp_secs_since_epoch` instead"]
971 pub fn new_seconds(
972 name: impl Into<re_log_types::TimelineName>,
973 seconds: impl IntoIterator<Item = impl Into<f64>>,
974 ) -> Self {
975 Self::new_duration_secs(name, seconds)
976 }
977
978 pub fn new_duration_nanos(
980 name: impl Into<re_log_types::TimelineName>,
981 nanos: impl IntoIterator<Item = impl Into<i64>>,
982 ) -> Self {
983 let time_vec = nanos
984 .into_iter()
985 .map(|nanos| {
986 let nanos = nanos.into();
987 NonMinI64::new(nanos)
988 .unwrap_or_else(|| {
989 re_log::error!(
990 illegal_value = nanos,
991 new_value = TimeInt::MIN.as_i64(),
992 "TimeColumn::new_duration_nanos() called with illegal value - clamped to minimum legal value"
993 );
994 NonMinI64::MIN
995 })
996 .get()
997 })
998 .collect_vec();
999
1000 Self::new(
1001 None,
1002 Timeline::new(name, TimeType::DurationNs),
1003 ArrowScalarBuffer::from(time_vec),
1004 )
1005 }
1006
1007 pub fn new_timestamp_nanos_since_epoch(
1009 name: impl Into<re_log_types::TimelineName>,
1010 nanos: impl IntoIterator<Item = impl Into<i64>>,
1011 ) -> Self {
1012 let time_vec = nanos
1013 .into_iter()
1014 .map(|nanos| {
1015 let nanos = nanos.into();
1016 NonMinI64::new(nanos)
1017 .unwrap_or_else(|| {
1018 re_log::error!(
1019 illegal_value = nanos,
1020 new_value = TimeInt::MIN.as_i64(),
1021 "TimeColumn::new_timestamp_nanos_since_epoch() called with illegal value - clamped to minimum legal value"
1022 );
1023 NonMinI64::MIN
1024 })
1025 .get()
1026 })
1027 .collect_vec();
1028
1029 Self::new(
1030 None,
1031 Timeline::new(name, TimeType::TimestampNs),
1032 ArrowScalarBuffer::from(time_vec),
1033 )
1034 }
1035
1036 #[deprecated = "Use `TimeColumn::new_duration_nanos` or `new_timestamp_nanos_since_epoch` instead"]
1038 pub fn new_nanos(
1039 name: impl Into<re_log_types::TimelineName>,
1040 nanos: impl IntoIterator<Item = impl Into<i64>>,
1041 ) -> Self {
1042 Self::new_duration_nanos(name, nanos)
1043 }
1044
1045 pub fn read_array(array: &dyn ArrowArray) -> Result<ArrowScalarBuffer<i64>, TimeColumnError> {
1049 #![allow(clippy::manual_map)]
1050
1051 if array.null_count() > 0 {
1052 Err(TimeColumnError::ContainsNulls)
1053 } else {
1054 Self::read_nullable_array(array).map(|(times, _nulls)| times)
1055 }
1056 }
1057
1058 pub fn read_nullable_array(
1062 array: &dyn ArrowArray,
1063 ) -> Result<(ArrowScalarBuffer<i64>, Option<ArrowNullBuffer>), TimeColumnError> {
1064 #![allow(clippy::manual_map)]
1065
1066 if let Some(times) = array.downcast_array_ref::<arrow::array::Int64Array>() {
1068 Ok((times.values().clone(), times.nulls().cloned()))
1069 } else if let Some(times) =
1070 array.downcast_array_ref::<arrow::array::TimestampNanosecondArray>()
1071 {
1072 Ok((times.values().clone(), times.nulls().cloned()))
1073 } else if let Some(times) =
1074 array.downcast_array_ref::<arrow::array::Time64NanosecondArray>()
1075 {
1076 Ok((times.values().clone(), times.nulls().cloned()))
1077 } else if let Some(times) =
1078 array.downcast_array_ref::<arrow::array::DurationNanosecondArray>()
1079 {
1080 Ok((times.values().clone(), times.nulls().cloned()))
1081 } else {
1082 Err(TimeColumnError::UnsupportedDataType(
1083 array.data_type().clone(),
1084 ))
1085 }
1086 }
1087}
1088
1089impl Chunk {
1092 #[inline]
1093 pub fn id(&self) -> ChunkId {
1094 self.id
1095 }
1096
1097 #[inline]
1098 pub fn entity_path(&self) -> &EntityPath {
1099 &self.entity_path
1100 }
1101
1102 #[inline]
1104 pub fn num_columns(&self) -> usize {
1105 let Self {
1106 id: _,
1107 entity_path: _, heap_size_bytes: _,
1109 is_sorted: _,
1110 row_ids: _,
1111 timelines,
1112 components,
1113 } = self;
1114
1115 1 + timelines.len() + components.len()
1116 }
1117
1118 #[inline]
1119 pub fn num_controls(&self) -> usize {
1120 _ = self;
1121 1 }
1123
1124 #[inline]
1125 pub fn num_timelines(&self) -> usize {
1126 self.timelines.len()
1127 }
1128
1129 #[inline]
1130 pub fn num_components(&self) -> usize {
1131 self.components.len()
1132 }
1133
1134 #[inline]
1135 pub fn num_rows(&self) -> usize {
1136 self.row_ids.len()
1137 }
1138
1139 #[inline]
1140 pub fn is_empty(&self) -> bool {
1141 self.num_rows() == 0
1142 }
1143
1144 #[inline]
1145 pub fn row_ids_array(&self) -> &FixedSizeBinaryArray {
1146 &self.row_ids
1147 }
1148
1149 #[inline]
1150 pub fn row_ids_slice(&self) -> &[RowId] {
1151 RowId::slice_from_arrow(&self.row_ids)
1152 }
1153
1154 #[inline]
1158 pub fn row_ids(&self) -> impl ExactSizeIterator<Item = RowId> + '_ {
1159 self.row_ids_slice().iter().copied()
1160 }
1161
1162 #[inline]
1167 pub fn component_row_ids(
1168 &self,
1169 component_descriptor: &ComponentDescriptor,
1170 ) -> impl Iterator<Item = RowId> + '_ + use<'_> {
1171 let Some(list_array) = self.components.get(component_descriptor) else {
1172 return Either::Left(std::iter::empty());
1173 };
1174
1175 let row_ids = self.row_ids();
1176
1177 if let Some(validity) = list_array.nulls() {
1178 Either::Right(Either::Left(
1179 row_ids
1180 .enumerate()
1181 .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
1182 ))
1183 } else {
1184 Either::Right(Either::Right(row_ids))
1185 }
1186 }
1187
1188 #[inline]
1194 pub fn row_id_range(&self) -> Option<(RowId, RowId)> {
1195 if self.is_empty() {
1196 return None;
1197 }
1198
1199 let row_ids = self.row_ids_slice();
1200
1201 #[allow(clippy::unwrap_used)] Some(if self.is_sorted() {
1203 (
1204 row_ids.first().copied().unwrap(),
1205 row_ids.last().copied().unwrap(),
1206 )
1207 } else {
1208 (
1209 row_ids.iter().min().copied().unwrap(),
1210 row_ids.iter().max().copied().unwrap(),
1211 )
1212 })
1213 }
1214
1215 #[inline]
1216 pub fn is_static(&self) -> bool {
1217 self.timelines.is_empty()
1218 }
1219
1220 #[inline]
1221 pub fn timelines(&self) -> &IntMap<TimelineName, TimeColumn> {
1222 &self.timelines
1223 }
1224
1225 #[inline]
1226 pub fn component_descriptors(&self) -> impl Iterator<Item = ComponentDescriptor> + '_ {
1227 self.components.keys().cloned()
1228 }
1229
1230 #[inline]
1231 pub fn components(&self) -> &ChunkComponents {
1232 &self.components
1233 }
1234}
1235
1236impl std::fmt::Display for Chunk {
1237 #[inline]
1238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1239 let batch = self.to_record_batch().map_err(|err| {
1240 re_log::error_once!("couldn't display Chunk: {err}");
1241 std::fmt::Error
1242 })?;
1243 re_format_arrow::format_record_batch_with_width(&batch, f.width()).fmt(f)
1244 }
1245}
1246
1247impl TimeColumn {
1248 #[inline]
1249 pub fn timeline(&self) -> &Timeline {
1250 &self.timeline
1251 }
1252
1253 #[inline]
1254 pub fn name(&self) -> &str {
1255 self.timeline.name()
1256 }
1257
1258 #[inline]
1259 pub fn time_range(&self) -> ResolvedTimeRange {
1260 self.time_range
1261 }
1262
1263 #[inline]
1264 pub fn times_buffer(&self) -> &ArrowScalarBuffer<i64> {
1265 &self.times
1266 }
1267
1268 #[inline]
1270 pub fn times_array(&self) -> ArrowArrayRef {
1271 self.timeline.typ().make_arrow_array(self.times.clone())
1272 }
1273
1274 #[inline]
1277 pub fn times_raw(&self) -> &[i64] {
1278 self.times.as_ref()
1279 }
1280
1281 #[inline]
1284 pub fn times_nonmin(&self) -> impl DoubleEndedIterator<Item = NonMinI64> + '_ {
1285 self.times_raw()
1286 .iter()
1287 .copied()
1288 .map(NonMinI64::saturating_from_i64)
1289 }
1290
1291 #[inline]
1292 pub fn times(&self) -> impl DoubleEndedIterator<Item = TimeInt> + '_ {
1293 self.times_raw().iter().copied().map(TimeInt::new_temporal)
1294 }
1295
1296 #[inline]
1297 pub fn num_rows(&self) -> usize {
1298 self.times.len()
1299 }
1300
1301 #[inline]
1302 pub fn is_empty(&self) -> bool {
1303 self.num_rows() == 0
1304 }
1305
1306 pub fn time_range_per_component(
1315 &self,
1316 components: &ChunkComponents,
1317 ) -> IntMap<ComponentDescriptor, ResolvedTimeRange> {
1318 let times = self.times_raw();
1319 components
1320 .iter()
1321 .filter_map(|(component_desc, list_array)| {
1322 if let Some(validity) = list_array.nulls() {
1323 if validity.is_empty() {
1326 return None;
1327 }
1328
1329 let is_dense = validity.null_count() == 0;
1330 if is_dense {
1331 return Some((component_desc.clone(), self.time_range));
1332 }
1333
1334 let mut time_min = TimeInt::MAX;
1335 for (i, time) in times.iter().copied().enumerate() {
1336 if validity.is_valid(i) {
1337 time_min = TimeInt::new_temporal(time);
1338 break;
1339 }
1340 }
1341
1342 let mut time_max = TimeInt::MIN;
1343 for (i, time) in times.iter().copied().enumerate().rev() {
1344 if validity.is_valid(i) {
1345 time_max = TimeInt::new_temporal(time);
1346 break;
1347 }
1348 }
1349
1350 Some((
1351 component_desc.clone(),
1352 ResolvedTimeRange::new(time_min, time_max),
1353 ))
1354 } else {
1355 Some((component_desc.clone(), self.time_range))
1358 }
1359 })
1360 .collect()
1361 }
1362}
1363
1364impl re_byte_size::SizeBytes for Chunk {
1365 #[inline]
1366 fn heap_size_bytes(&self) -> u64 {
1367 let Self {
1368 id,
1369 entity_path,
1370 heap_size_bytes,
1371 is_sorted,
1372 row_ids,
1373 timelines,
1374 components,
1375 } = self;
1376
1377 let mut size_bytes = heap_size_bytes.load(Ordering::Relaxed);
1378
1379 if size_bytes == 0 {
1380 size_bytes = id.heap_size_bytes()
1381 + entity_path.heap_size_bytes()
1382 + is_sorted.heap_size_bytes()
1383 + row_ids.heap_size_bytes()
1384 + timelines.heap_size_bytes()
1385 + components.heap_size_bytes();
1386 heap_size_bytes.store(size_bytes, Ordering::Relaxed);
1387 }
1388
1389 size_bytes
1390 }
1391}
1392
1393impl re_byte_size::SizeBytes for TimeColumn {
1394 #[inline]
1395 fn heap_size_bytes(&self) -> u64 {
1396 let Self {
1397 timeline,
1398 times,
1399 is_sorted,
1400 time_range,
1401 } = self;
1402
1403 timeline.heap_size_bytes()
1404 + times.heap_size_bytes() + is_sorted.heap_size_bytes()
1406 + time_range.heap_size_bytes()
1407 }
1408}
1409
1410impl Chunk {
1413 #[track_caller]
1417 pub fn sanity_check(&self) -> ChunkResult<()> {
1418 re_tracing::profile_function!();
1419
1420 let Self {
1421 id: _,
1422 entity_path: _,
1423 heap_size_bytes,
1424 is_sorted,
1425 row_ids,
1426 timelines,
1427 components,
1428 } = self;
1429
1430 #[allow(clippy::collapsible_if)] if cfg!(debug_assertions) {
1432 let measured = self.heap_size_bytes();
1433 let advertised = heap_size_bytes.load(Ordering::Relaxed);
1434 if advertised != measured {
1435 return Err(ChunkError::Malformed {
1436 reason: format!(
1437 "Chunk advertises a heap size of {} but we measure {} instead",
1438 re_format::format_bytes(advertised as _),
1439 re_format::format_bytes(measured as _),
1440 ),
1441 });
1442 }
1443 }
1444
1445 {
1447 if *row_ids.data_type() != RowId::arrow_datatype() {
1448 return Err(ChunkError::Malformed {
1449 reason: format!(
1450 "RowId data has the wrong datatype: expected {:?} but got {:?} instead",
1451 RowId::arrow_datatype(),
1452 *row_ids.data_type(),
1453 ),
1454 });
1455 }
1456
1457 #[allow(clippy::collapsible_if)] if cfg!(debug_assertions) {
1459 if *is_sorted != self.is_sorted_uncached() {
1460 return Err(ChunkError::Malformed {
1461 reason: format!(
1462 "Chunk is marked as {}sorted but isn't: {row_ids:?}",
1463 if *is_sorted { "" } else { "un" },
1464 ),
1465 });
1466 }
1467 }
1468 }
1469
1470 for (timeline_name, time_column) in timelines {
1472 if time_column.times.len() != row_ids.len() {
1473 return Err(ChunkError::Malformed {
1474 reason: format!(
1475 "All timelines in a chunk must have the same number of timestamps, matching the number of row IDs. \
1476 Found {} row IDs but {} timestamps for timeline '{timeline_name}'",
1477 row_ids.len(),
1478 time_column.times.len(),
1479 ),
1480 });
1481 }
1482
1483 time_column.sanity_check()?;
1484 }
1485
1486 for (component_desc, list_array) in components.iter() {
1489 if let Some(c) = component_desc.component_type {
1490 c.sanity_check();
1491 }
1492 if let arrow::datatypes::DataType::List(_field) = list_array.data_type() {
1494 } else {
1497 return Err(ChunkError::Malformed {
1498 reason: format!(
1499 "The inner array in a chunked component batch must be a list, got {:?}",
1500 list_array.data_type(),
1501 ),
1502 });
1503 }
1504
1505 if list_array.len() != row_ids.len() {
1506 return Err(ChunkError::Malformed {
1507 reason: format!(
1508 "All component batches in a chunk must have the same number of rows, matching the number of row IDs. \
1509 Found {} row IDs but {} rows for component batch {component_desc}",
1510 row_ids.len(),
1511 list_array.len(),
1512 ),
1513 });
1514 }
1515
1516 let validity_is_empty = list_array
1517 .nulls()
1518 .is_some_and(|validity| validity.is_empty());
1519 if !self.is_empty() && validity_is_empty {
1520 return Err(ChunkError::Malformed {
1521 reason: format!(
1522 "All component batches in a chunk must contain at least one non-null entry.\
1523 Found a completely empty column for {component_desc}",
1524 ),
1525 });
1526 }
1527 }
1528
1529 Ok(())
1530 }
1531}
1532
1533impl TimeColumn {
1534 #[track_caller]
1538 pub fn sanity_check(&self) -> ChunkResult<()> {
1539 let Self {
1540 timeline: _,
1541 times,
1542 is_sorted,
1543 time_range,
1544 } = self;
1545
1546 let times = times.as_ref();
1547
1548 #[allow(clippy::collapsible_if)] if cfg!(debug_assertions) {
1550 if *is_sorted != times.windows(2).all(|times| times[0] <= times[1]) {
1551 return Err(ChunkError::Malformed {
1552 reason: format!(
1553 "Time column is marked as {}sorted but isn't: {times:?}",
1554 if *is_sorted { "" } else { "un" },
1555 ),
1556 });
1557 }
1558 }
1559
1560 #[allow(clippy::collapsible_if)] if cfg!(debug_assertions) {
1562 let is_tight_lower_bound = times.iter().any(|&time| time == time_range.min().as_i64());
1563 let is_tight_upper_bound = times.iter().any(|&time| time == time_range.max().as_i64());
1564 let is_tight_bound = is_tight_lower_bound && is_tight_upper_bound;
1565
1566 if !self.is_empty() && !is_tight_bound {
1567 return Err(ChunkError::Malformed {
1568 reason: "Time column's cached time range isn't a tight bound.".to_owned(),
1569 });
1570 }
1571
1572 for &time in times {
1573 if time < time_range.min().as_i64() || time > time_range.max().as_i64() {
1574 return Err(ChunkError::Malformed {
1575 reason: format!(
1576 "Time column's cached time range is wrong.\
1577 Found a time value of {time} while its time range is {time_range:?}",
1578 ),
1579 });
1580 }
1581
1582 if time == TimeInt::STATIC.as_i64() {
1583 return Err(ChunkError::Malformed {
1584 reason: "A chunk's timeline should never contain a static time value."
1585 .to_owned(),
1586 });
1587 }
1588 }
1589 }
1590
1591 Ok(())
1592 }
1593}