re_chunk/
chunk.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3use ahash::HashMap;
4use anyhow::Context as _;
5use arrow::{
6    array::{
7        Array as ArrowArray, ArrayRef as ArrowArrayRef, FixedSizeBinaryArray,
8        ListArray as ArrowListArray,
9    },
10    buffer::{NullBuffer as ArrowNullBuffer, ScalarBuffer as ArrowScalarBuffer},
11};
12use itertools::{Either, Itertools as _, izip};
13use nohash_hasher::IntMap;
14
15use re_arrow_util::{ArrowArrayDowncastRef as _, widen_binary_arrays};
16use re_byte_size::SizeBytes as _;
17use re_log_types::{
18    AbsoluteTimeRange, EntityPath, NonMinI64, TimeInt, TimeType, Timeline, TimelineName,
19};
20use re_types_core::{
21    ComponentDescriptor, ComponentIdentifier, ComponentType, DeserializationError, Loggable as _,
22    SerializationError, SerializedComponentColumn,
23};
24
25use crate::{ChunkId, RowId};
26
27// ---
28
29/// Errors that can occur when creating/manipulating a [`Chunk`]s, directly or indirectly through
30/// the use of a [`crate::ChunkBatcher`].
31#[derive(thiserror::Error, Debug)]
32pub enum ChunkError {
33    #[error("Detected malformed Chunk: {reason}")]
34    Malformed { reason: String },
35
36    #[error("Arrow: {0}")]
37    Arrow(#[from] arrow::error::ArrowError),
38
39    #[error("{kind} index out of bounds: {index} (len={len})")]
40    IndexOutOfBounds {
41        kind: String,
42        len: usize,
43        index: usize,
44    },
45
46    #[error("Serialization: {0}")]
47    Serialization(#[from] SerializationError),
48
49    #[error("Deserialization: {0}")]
50    Deserialization(#[from] DeserializationError),
51
52    #[error(transparent)]
53    UnsupportedTimeType(#[from] re_sorbet::UnsupportedTimeType),
54
55    #[error(transparent)]
56    WrongDatatypeError(#[from] re_sorbet::WrongDatatypeError),
57
58    #[error(transparent)]
59    MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError),
60
61    #[error(transparent)]
62    InvalidSorbetSchema(#[from] re_sorbet::SorbetError),
63}
64
65const _: () = assert!(
66    std::mem::size_of::<ChunkError>() <= 72,
67    "Error type is too large. Try to reduce its size by boxing some of its variants.",
68);
69
70pub type ChunkResult<T> = Result<T, ChunkError>;
71
72// ---
73
74#[derive(Debug, Clone, Default, PartialEq)]
75pub struct ChunkComponents(pub IntMap<ComponentIdentifier, SerializedComponentColumn>);
76
77impl ChunkComponents {
78    /// Returns all list arrays for the given component type.
79    ///
80    /// I.e semantically equivalent to `get("MyComponent:*.*")`
81    #[inline]
82    pub fn get_by_component_type(
83        &self,
84        component_type: ComponentType,
85    ) -> impl Iterator<Item = &ArrowListArray> {
86        self.0.values().filter_map(move |column| {
87            (column.descriptor.component_type == Some(component_type)).then_some(&column.list_array)
88        })
89    }
90
91    /// Approximate equal, that ignores small numeric differences.
92    ///
93    /// Returns `Ok` if similar.
94    /// If there is a difference, a description of that difference is returned in `Err`.
95    /// We use [`anyhow`] to provide context.
96    ///
97    /// Useful for tests.
98    pub fn ensure_similar(left: &Self, right: &Self) -> anyhow::Result<()> {
99        anyhow::ensure!(left.len() == right.len());
100        for (component, left_column) in left.iter() {
101            let Some(right_column) = right.get(*component) else {
102                anyhow::bail!("rhs is missing {component:?}");
103            };
104            anyhow::ensure!(left_column.descriptor == right_column.descriptor);
105
106            let left_array = widen_binary_arrays(&left_column.list_array);
107            let right_array = widen_binary_arrays(&right_column.list_array);
108            re_arrow_util::ensure_similar(&left_array.to_data(), &right_array.to_data())
109                .with_context(|| format!("Component {component:?}"))?;
110        }
111        Ok(())
112    }
113
114    /// Whether any of the components in this chunk has the given name.
115    #[inline]
116    pub fn contains_component(&self, component: ComponentIdentifier) -> bool {
117        self.0.contains_key(&component)
118    }
119
120    /// Lists all the component descriptors in this chunk.
121    #[inline]
122    pub fn component_descriptors(&self) -> impl Iterator<Item = &ComponentDescriptor> + '_ {
123        self.0.values().map(|column| &column.descriptor)
124    }
125
126    /// Lists all the component list arrays in this chunk.
127    #[inline]
128    pub fn list_arrays(&self) -> impl Iterator<Item = &ArrowListArray> + '_ {
129        self.0.values().map(|column| &column.list_array)
130    }
131
132    /// Lists all the component list arrays in this chunk.
133    #[inline]
134    pub fn list_arrays_mut(&mut self) -> impl Iterator<Item = &mut ArrowListArray> + '_ {
135        self.0.values_mut().map(|column| &mut column.list_array)
136    }
137
138    /// Returns the array for a given component if any.
139    #[inline]
140    pub fn get_array(&self, component: ComponentIdentifier) -> Option<&ArrowListArray> {
141        self.0.get(&component).map(|column| &column.list_array)
142    }
143
144    /// Returns the descriptor for a given component if any.
145    #[inline]
146    pub fn get_descriptor(&self, component: ComponentIdentifier) -> Option<&ComponentDescriptor> {
147        self.0.get(&component).map(|column| &column.descriptor)
148    }
149
150    /// Returns the descriptor and array for a given component if any.
151    #[inline]
152    pub fn get(&self, component: ComponentIdentifier) -> Option<&SerializedComponentColumn> {
153        self.0.get(&component)
154    }
155
156    /// Unconditionally inserts a [`SerializedComponentColumn`].
157    ///
158    /// Removes and replaces the column if it already exists.
159    #[inline]
160    pub fn insert(
161        &mut self,
162        column: SerializedComponentColumn,
163    ) -> Option<SerializedComponentColumn> {
164        self.0.insert(column.descriptor.component, column)
165    }
166}
167
168impl std::ops::Deref for ChunkComponents {
169    type Target = IntMap<ComponentIdentifier, SerializedComponentColumn>;
170
171    #[inline]
172    fn deref(&self) -> &Self::Target {
173        &self.0
174    }
175}
176
177impl std::ops::DerefMut for ChunkComponents {
178    #[inline]
179    fn deref_mut(&mut self) -> &mut Self::Target {
180        &mut self.0
181    }
182}
183
184// TODO(andreas): Remove this variant, we should let users construct `SerializedComponentColumn` directly to sharpen semantics.
185impl FromIterator<(ComponentDescriptor, ArrowListArray)> for ChunkComponents {
186    #[inline]
187    fn from_iter<T: IntoIterator<Item = (ComponentDescriptor, ArrowListArray)>>(iter: T) -> Self {
188        let mut this = Self::default();
189        {
190            for (component_desc, list_array) in iter {
191                this.insert(SerializedComponentColumn::new(list_array, component_desc));
192            }
193        }
194        this
195    }
196}
197
198impl FromIterator<SerializedComponentColumn> for ChunkComponents {
199    #[inline]
200    fn from_iter<T: IntoIterator<Item = SerializedComponentColumn>>(iter: T) -> Self {
201        let mut this = Self::default();
202        {
203            for serialized in iter {
204                this.insert(serialized);
205            }
206        }
207        this
208    }
209}
210
211/// Dense arrow-based storage of N rows of multi-component multi-temporal data for a specific entity.
212///
213/// This is our core datastructure for logging, storing, querying and transporting data around.
214///
215/// The chunk as a whole is always ascendingly sorted by [`RowId`] before it gets manipulated in any way.
216/// Its time columns might or might not be ascendingly sorted, depending on how the data was logged.
217///
218/// This is the in-memory representation of a chunk, optimized for efficient manipulation of the
219/// data within. For transport, see [`re_sorbet::ChunkBatch`] instead.
220#[derive(Debug)]
221pub struct Chunk {
222    pub(crate) id: ChunkId,
223
224    pub(crate) entity_path: EntityPath,
225
226    /// The heap size of this chunk in bytes.
227    ///
228    /// Must be cached as it is very costly to compute, and needs to be computed repeatedly on the
229    /// hot path (e.g. during garbage collection).
230    pub(crate) heap_size_bytes: AtomicU64,
231
232    /// Is the chunk as a whole sorted by [`RowId`]?
233    pub(crate) is_sorted: bool,
234
235    /// The respective [`RowId`]s for each row of data.
236    pub(crate) row_ids: FixedSizeBinaryArray,
237
238    /// The time columns.
239    ///
240    /// Each column must be the same length as `row_ids`.
241    ///
242    /// Empty if this is a static chunk.
243    pub(crate) timelines: IntMap<TimelineName, TimeColumn>,
244
245    /// A sparse `ListArray` & a [`ComponentDescriptor`] for each component.
246    ///
247    /// Each `ListArray` must be the same length as `row_ids`.
248    ///
249    /// Sparse so that we can e.g. log a `Position` at one timestamp but not a `Color`.
250    pub(crate) components: ChunkComponents,
251}
252
253impl PartialEq for Chunk {
254    #[inline]
255    fn eq(&self, other: &Self) -> bool {
256        let Self {
257            id,
258            entity_path,
259            heap_size_bytes: _,
260            is_sorted,
261            row_ids,
262            timelines,
263            components,
264        } = self;
265
266        *id == other.id
267            && *entity_path == other.entity_path
268            && *is_sorted == other.is_sorted
269            && *row_ids == other.row_ids
270            && *timelines == other.timelines
271            && *components == other.components
272    }
273}
274
275impl Chunk {
276    /// Returns a version of us with a new [`ChunkId`].
277    ///
278    /// Reminder:
279    /// * The returned [`Chunk`] will re-use the exact same [`RowId`]s as `self`.
280    /// * Duplicated [`RowId`]s in the `ChunkStore` is undefined behavior.
281    #[must_use]
282    #[inline]
283    pub fn with_id(mut self, id: ChunkId) -> Self {
284        self.id = id;
285        self
286    }
287
288    /// Returns `Ok` if two [`Chunk`]s are _similar_, although not byte-for-byte equal.
289    ///
290    /// In particular, this ignores chunks and row IDs, as well as `log_time` timestamps.
291    /// It also forgives small numeric inaccuracies in floating point buffers.
292    ///
293    /// If there is a difference, a description of that difference is returned in `Err`.
294    /// We use [`anyhow`] to provide context.
295    ///
296    /// Useful for tests.
297    pub fn ensure_similar(lhs: &Self, rhs: &Self) -> anyhow::Result<()> {
298        let Self {
299            id: _,
300            entity_path,
301            heap_size_bytes: _,
302            is_sorted: _,
303            row_ids: _,
304            timelines,
305            components,
306        } = lhs;
307
308        anyhow::ensure!(*entity_path == rhs.entity_path);
309
310        anyhow::ensure!(timelines.keys().collect_vec() == rhs.timelines.keys().collect_vec());
311
312        for (timeline, left_time_col) in timelines {
313            let right_time_col = rhs
314                .timelines
315                .get(timeline)
316                .ok_or_else(|| anyhow::format_err!("right is missing timeline {timeline:?}"))?;
317            if timeline == &TimelineName::log_time() {
318                continue; // We expect this to differ
319            }
320            if timeline == "sim_time" {
321                continue; // Small numeric differences
322            }
323            anyhow::ensure!(
324                left_time_col == right_time_col,
325                "Timeline differs: {timeline:?}"
326            );
327        }
328
329        // Handle edge case: recording time on partition properties should ignore start time.
330        if entity_path == &EntityPath::properties() {
331            // We're going to filter out some components on both lhs and rhs.
332            // Therefore, it's important that we first check that the number of components is the same.
333            anyhow::ensure!(components.len() == rhs.components.len());
334
335            // Copied from `rerun.archetypes.RecordingInfo`.
336            let recording_time_component: ComponentIdentifier = "RecordingInfo:start_time".into();
337
338            // Filter out the recording time component from both lhs and rhs.
339            let lhs_components = components
340                .iter()
341                .filter(|&(component, _list_array)| component != &recording_time_component)
342                .map(|(component, list_array)| (*component, list_array.clone()))
343                .collect::<IntMap<_, _>>();
344            let rhs_components = rhs
345                .components
346                .iter()
347                .filter(|&(component, _list_array)| component != &recording_time_component)
348                .map(|(component, list_array)| (*component, list_array.clone()))
349                .collect::<IntMap<_, _>>();
350
351            anyhow::ensure!(lhs_components == rhs_components);
352            Ok(())
353        } else {
354            ChunkComponents::ensure_similar(components, &rhs.components)
355        }
356    }
357
358    // Only used for tests atm
359    pub fn are_equal(&self, other: &Self) -> bool {
360        let Self {
361            id,
362            entity_path,
363            heap_size_bytes: _,
364            is_sorted,
365            row_ids,
366            timelines,
367            components,
368        } = self;
369
370        *id == other.id
371            && *entity_path == other.entity_path
372            && *is_sorted == other.is_sorted
373            && row_ids == &other.row_ids
374            && *timelines == other.timelines
375            && components.0 == other.components.0
376    }
377}
378
379impl Clone for Chunk {
380    #[inline]
381    fn clone(&self) -> Self {
382        Self {
383            id: self.id,
384            entity_path: self.entity_path.clone(),
385            heap_size_bytes: AtomicU64::new(self.heap_size_bytes.load(Ordering::Relaxed)),
386            is_sorted: self.is_sorted,
387            row_ids: self.row_ids.clone(),
388            timelines: self.timelines.clone(),
389            components: self.components.clone(),
390        }
391    }
392}
393
394impl Chunk {
395    /// Clones the chunk and assign new IDs to the resulting chunk and its rows.
396    ///
397    /// `first_row_id` will become the [`RowId`] of the first row in the duplicated chunk.
398    /// Each row after that will be monotonically increasing.
399    #[inline]
400    pub fn clone_as(&self, id: ChunkId, first_row_id: RowId) -> Self {
401        let row_ids = std::iter::from_fn({
402            let mut row_id = first_row_id;
403            move || {
404                let yielded = row_id;
405                row_id = row_id.next();
406                Some(yielded)
407            }
408        })
409        .take(self.row_ids.len())
410        .collect_vec();
411
412        Self {
413            id,
414            row_ids: RowId::arrow_from_slice(&row_ids),
415            ..self.clone()
416        }
417    }
418
419    /// Clones the chunk into a new chunk without any time data.
420    #[inline]
421    pub fn into_static(mut self) -> Self {
422        self.timelines.clear();
423        self
424    }
425
426    /// Clones the chunk into a new chunk where all [`RowId`]s are [`RowId::ZERO`].
427    pub fn zeroed(self) -> Self {
428        let row_ids = vec![RowId::ZERO; self.row_ids.len()];
429
430        let row_ids = RowId::arrow_from_slice(&row_ids);
431
432        Self { row_ids, ..self }
433    }
434
435    /// Computes the time range covered by each individual component column on each timeline.
436    ///
437    /// This is different from the time range covered by the [`Chunk`] as a whole because component
438    /// columns are potentially sparse.
439    ///
440    /// This is crucial for indexing and queries to work properly.
441    //
442    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
443    #[inline]
444    pub fn time_range_per_component(
445        &self,
446    ) -> IntMap<TimelineName, IntMap<ComponentIdentifier, AbsoluteTimeRange>> {
447        re_tracing::profile_function!();
448
449        self.timelines
450            .iter()
451            .map(|(timeline_name, time_column)| {
452                (
453                    *timeline_name,
454                    time_column.time_range_per_component(&self.components),
455                )
456            })
457            .collect()
458    }
459
460    #[inline]
461    pub fn component_descriptors(&self) -> impl Iterator<Item = &ComponentDescriptor> + '_ {
462        self.components.component_descriptors()
463    }
464
465    /// The cumulative number of events in this chunk.
466    ///
467    /// I.e. how many _component batches_ ("cells") were logged in total?
468    //
469    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
470    #[inline]
471    pub fn num_events_cumulative(&self) -> u64 {
472        // Reminder: component columns are sparse, we must take a look at the validity bitmaps.
473        self.components
474            .list_arrays()
475            .map(|list_array| {
476                list_array.nulls().map_or_else(
477                    || list_array.len() as u64,
478                    |validity| validity.len() as u64 - validity.null_count() as u64,
479                )
480            })
481            .sum()
482    }
483
484    /// The cumulative number of events in this chunk for each _unique_ timestamp.
485    ///
486    /// I.e. how many _component batches_ ("cells") were logged in total at each timestamp?
487    ///
488    /// Keep in mind that a timestamp can appear multiple times in a [`Chunk`].
489    /// This method will do a sum accumulation to account for these cases (i.e. every timestamp in
490    /// the returned vector is guaranteed to be unique).
491    pub fn num_events_cumulative_per_unique_time(
492        &self,
493        timeline: &TimelineName,
494    ) -> Vec<(TimeInt, u64)> {
495        re_tracing::profile_function!();
496
497        if self.is_static() {
498            return vec![(TimeInt::STATIC, self.num_events_cumulative())];
499        }
500
501        let Some(time_column) = self.timelines().get(timeline) else {
502            return Vec::new();
503        };
504
505        let time_range = time_column.time_range();
506        if time_range.min() == time_range.max() {
507            return vec![(time_range.min(), self.num_events_cumulative())];
508        }
509
510        let counts = if time_column.is_sorted() {
511            self.num_events_cumulative_per_unique_time_sorted(time_column)
512        } else {
513            self.num_events_cumulative_per_unique_time_unsorted(time_column)
514        };
515
516        debug_assert!(
517            counts
518                .iter()
519                .tuple_windows::<(_, _)>()
520                .all(|((time1, _), (time2, _))| time1 < time2)
521        );
522
523        counts
524    }
525
526    fn num_events_cumulative_per_unique_time_sorted(
527        &self,
528        time_column: &TimeColumn,
529    ) -> Vec<(TimeInt, u64)> {
530        re_tracing::profile_function!();
531
532        debug_assert!(time_column.is_sorted());
533
534        // NOTE: This is used on some very hot paths (time panel rendering).
535        // Performance trumps readability. Optimized empirically.
536
537        // Raw, potentially duplicated counts (because timestamps aren't necessarily unique).
538        let mut counts_raw = vec![0u64; self.num_rows()];
539        {
540            self.components.list_arrays().for_each(|list_array| {
541                if let Some(validity) = list_array.nulls() {
542                    validity
543                        .iter()
544                        .enumerate()
545                        .for_each(|(i, is_valid)| counts_raw[i] += is_valid as u64);
546                } else {
547                    for count in &mut counts_raw {
548                        *count += 1;
549                    }
550                }
551            });
552        }
553
554        let mut counts = Vec::with_capacity(counts_raw.len());
555
556        let Some(mut cur_time) = time_column.times().next() else {
557            return Vec::new();
558        };
559        let mut cur_count = 0;
560        izip!(time_column.times(), counts_raw).for_each(|(time, count)| {
561            if time == cur_time {
562                cur_count += count;
563            } else {
564                counts.push((cur_time, cur_count));
565                cur_count = count;
566                cur_time = time;
567            }
568        });
569
570        if counts.last().map(|(time, _)| *time) != Some(cur_time) {
571            counts.push((cur_time, cur_count));
572        }
573
574        counts
575    }
576
577    fn num_events_cumulative_per_unique_time_unsorted(
578        &self,
579        time_column: &TimeColumn,
580    ) -> Vec<(TimeInt, u64)> {
581        re_tracing::profile_function!();
582
583        debug_assert!(!time_column.is_sorted());
584
585        // NOTE: This is used on some very hot paths (time panel rendering).
586
587        let result_unordered =
588            self.components
589                .list_arrays()
590                .fold(HashMap::default(), |acc, list_array| {
591                    if let Some(validity) = list_array.nulls() {
592                        time_column.times().zip(validity.iter()).fold(
593                            acc,
594                            |mut acc, (time, is_valid)| {
595                                *acc.entry(time).or_default() += is_valid as u64;
596                                acc
597                            },
598                        )
599                    } else {
600                        time_column.times().fold(acc, |mut acc, time| {
601                            *acc.entry(time).or_default() += 1;
602                            acc
603                        })
604                    }
605                });
606
607        let mut result = result_unordered.into_iter().collect_vec();
608        result.sort_by_key(|val| val.0);
609        result
610    }
611
612    /// The number of events in this chunk for the specified component.
613    ///
614    /// I.e. how many _component batches_ ("cells") were logged in total for this component?
615    //
616    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
617    #[inline]
618    pub fn num_events_for_component(&self, component: ComponentIdentifier) -> Option<u64> {
619        // Reminder: component columns are sparse, we must check validity bitmap.
620        self.components.get_array(component).map(|list_array| {
621            list_array.nulls().map_or_else(
622                || list_array.len() as u64,
623                |validity| validity.len() as u64 - validity.null_count() as u64,
624            )
625        })
626    }
627
628    /// Computes the `RowId` range covered by each individual component column on each timeline.
629    ///
630    /// This is different from the `RowId` range covered by the [`Chunk`] as a whole because component
631    /// columns are potentially sparse.
632    ///
633    /// This is crucial for indexing and queries to work properly.
634    //
635    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
636    pub fn row_id_range_per_component(&self) -> IntMap<ComponentIdentifier, (RowId, RowId)> {
637        re_tracing::profile_function!();
638
639        let row_ids = self.row_ids().collect_vec();
640
641        if self.is_sorted() {
642            self.components
643                .iter()
644                .filter_map(|(component, column)| {
645                    let mut row_id_min = None;
646                    let mut row_id_max = None;
647
648                    for (i, &row_id) in row_ids.iter().enumerate() {
649                        if column.list_array.is_valid(i) {
650                            row_id_min = Some(row_id);
651                        }
652                    }
653                    for (i, &row_id) in row_ids.iter().enumerate().rev() {
654                        if column.list_array.is_valid(i) {
655                            row_id_max = Some(row_id);
656                        }
657                    }
658
659                    Some((*component, (row_id_min?, row_id_max?)))
660                })
661                .collect()
662        } else {
663            self.components
664                .iter()
665                .filter_map(|(component, column)| {
666                    let mut row_id_min = Some(RowId::MAX);
667                    let mut row_id_max = Some(RowId::ZERO);
668
669                    for (i, &row_id) in row_ids.iter().enumerate() {
670                        if column.list_array.is_valid(i) && Some(row_id) > row_id_min {
671                            row_id_min = Some(row_id);
672                        }
673                    }
674                    for (i, &row_id) in row_ids.iter().enumerate().rev() {
675                        if column.list_array.is_valid(i) && Some(row_id) < row_id_max {
676                            row_id_max = Some(row_id);
677                        }
678                    }
679
680                    Some((*component, (row_id_min?, row_id_max?)))
681                })
682                .collect()
683        }
684    }
685}
686
687// ---
688
689#[derive(Debug, Clone, PartialEq, Eq)]
690pub struct TimeColumn {
691    pub(crate) timeline: Timeline,
692
693    /// Every single timestamp for this timeline.
694    ///
695    /// * This might or might not be sorted, depending on how the data was logged.
696    /// * This is guaranteed to always be dense, because chunks are split anytime a timeline is
697    ///   added or removed.
698    /// * This cannot ever contain `TimeInt::STATIC`, since static data doesn't even have timelines.
699    ///
700    /// When this buffer is converted to an arrow array, it's datatype will depend
701    /// on the timeline type, so it will either become a
702    /// [`arrow::array::Int64Array`] or a [`arrow::array::TimestampNanosecondArray`].
703    pub(crate) times: ArrowScalarBuffer<i64>,
704
705    /// Is [`Self::times`] sorted?
706    ///
707    /// This is completely independent of [`Chunk::is_sorted`]: a timeline doesn't necessarily
708    /// follow the global [`RowId`]-based order, although it does in most cases (happy path).
709    pub(crate) is_sorted: bool,
710
711    /// The time range covered by [`Self::times`].
712    ///
713    /// Not necessarily contiguous! Just the min and max value found in [`Self::times`].
714    pub(crate) time_range: AbsoluteTimeRange,
715}
716
717/// Errors when deserializing/parsing/reading a column of time data.
718#[derive(Debug, thiserror::Error)]
719pub enum TimeColumnError {
720    #[error("Time columns had nulls, but should be dense")]
721    ContainsNulls,
722
723    #[error("Unsupported data type : {0}")]
724    UnsupportedDataType(arrow::datatypes::DataType),
725}
726
727impl Chunk {
728    /// Creates a new [`Chunk`].
729    ///
730    /// This will fail if the passed in data is malformed in any way -- see [`Self::sanity_check`]
731    /// for details.
732    ///
733    /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`.
734    /// When left unspecified (`None`), it will be computed in O(n) time.
735    ///
736    /// For a row-oriented constructor, see [`Self::builder`].
737    pub fn new(
738        id: ChunkId,
739        entity_path: EntityPath,
740        is_sorted: Option<bool>,
741        row_ids: FixedSizeBinaryArray,
742        timelines: IntMap<TimelineName, TimeColumn>,
743        components: ChunkComponents,
744    ) -> ChunkResult<Self> {
745        let mut chunk = Self {
746            id,
747            entity_path,
748            heap_size_bytes: AtomicU64::new(0),
749            is_sorted: false,
750            row_ids,
751            timelines,
752            components,
753        };
754
755        chunk.is_sorted = is_sorted.unwrap_or_else(|| chunk.is_sorted_uncached());
756
757        chunk.sanity_check()?;
758
759        Ok(chunk)
760    }
761
762    /// Creates a new [`Chunk`].
763    ///
764    /// This will fail if the passed in data is malformed in any way -- see [`Self::sanity_check`]
765    /// for details.
766    ///
767    /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`.
768    /// When left unspecified (`None`), it will be computed in O(n) time.
769    ///
770    /// For a row-oriented constructor, see [`Self::builder`].
771    pub fn from_native_row_ids(
772        id: ChunkId,
773        entity_path: EntityPath,
774        is_sorted: Option<bool>,
775        row_ids: &[RowId],
776        timelines: IntMap<TimelineName, TimeColumn>,
777        components: ChunkComponents,
778    ) -> ChunkResult<Self> {
779        re_tracing::profile_function!();
780        let row_ids = RowId::arrow_from_slice(row_ids);
781        Self::new(id, entity_path, is_sorted, row_ids, timelines, components)
782    }
783
784    /// Creates a new [`Chunk`].
785    ///
786    /// This will fail if the passed in data is malformed in any way -- see [`Self::sanity_check`]
787    /// for details.
788    ///
789    /// The data is assumed to be sorted in `RowId`-order. Sequential `RowId`s will be generated for each
790    /// row in the chunk.
791    pub fn from_auto_row_ids(
792        id: ChunkId,
793        entity_path: EntityPath,
794        timelines: IntMap<TimelineName, TimeColumn>,
795        components: ChunkComponents,
796    ) -> ChunkResult<Self> {
797        let count = components
798            .list_arrays()
799            .next()
800            .map_or(0, |list_array| list_array.len());
801
802        let row_ids = std::iter::from_fn({
803            let tuid: re_tuid::Tuid = *id;
804            let mut row_id = RowId::from_tuid(tuid.next());
805            move || {
806                let yielded = row_id;
807                row_id = row_id.next();
808                Some(yielded)
809            }
810        })
811        .take(count)
812        .collect_vec();
813
814        Self::from_native_row_ids(id, entity_path, Some(true), &row_ids, timelines, components)
815    }
816
817    /// Simple helper for [`Self::new`] for static data.
818    ///
819    /// For a row-oriented constructor, see [`Self::builder`].
820    #[inline]
821    pub fn new_static(
822        id: ChunkId,
823        entity_path: EntityPath,
824        is_sorted: Option<bool>,
825        row_ids: FixedSizeBinaryArray,
826        components: ChunkComponents,
827    ) -> ChunkResult<Self> {
828        Self::new(
829            id,
830            entity_path,
831            is_sorted,
832            row_ids,
833            Default::default(),
834            components,
835        )
836    }
837
838    #[inline]
839    pub fn empty(id: ChunkId, entity_path: EntityPath) -> Self {
840        Self {
841            id,
842            entity_path,
843            heap_size_bytes: Default::default(),
844            is_sorted: true,
845            row_ids: RowId::arrow_from_slice(&[]),
846            timelines: Default::default(),
847            components: Default::default(),
848        }
849    }
850
851    /// Unconditionally inserts a [`SerializedComponentColumn`].
852    ///
853    /// Removes and replaces the column if it already exists.
854    ///
855    /// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`].
856    #[inline]
857    pub fn add_component(
858        &mut self,
859        component_column: SerializedComponentColumn,
860    ) -> ChunkResult<()> {
861        self.components.insert(component_column);
862        self.sanity_check()
863    }
864
865    /// Unconditionally inserts a [`TimeColumn`].
866    ///
867    /// Removes and replaces the column if it already exists.
868    ///
869    /// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`].
870    #[inline]
871    pub fn add_timeline(&mut self, chunk_timeline: TimeColumn) -> ChunkResult<()> {
872        self.timelines
873            .insert(*chunk_timeline.timeline.name(), chunk_timeline);
874        self.sanity_check()
875    }
876}
877
878impl TimeColumn {
879    /// Creates a new [`TimeColumn`].
880    ///
881    /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`.
882    /// When left unspecified (`None`), it will be computed in O(n) time.
883    ///
884    /// For a row-oriented constructor, see [`Self::builder`].
885    pub fn new(is_sorted: Option<bool>, timeline: Timeline, times: ArrowScalarBuffer<i64>) -> Self {
886        re_tracing::profile_function_if!(1000 < times.len(), format!("{} times", times.len()));
887
888        let time_slice = times.as_ref();
889
890        let is_sorted =
891            is_sorted.unwrap_or_else(|| time_slice.windows(2).all(|times| times[0] <= times[1]));
892
893        let time_range = if is_sorted {
894            // NOTE: The 'or' in 'map_or' is never hit, but better safe than sorry.
895            let min_time = time_slice
896                .first()
897                .copied()
898                .map_or(TimeInt::MIN, TimeInt::new_temporal);
899            let max_time = time_slice
900                .last()
901                .copied()
902                .map_or(TimeInt::MAX, TimeInt::new_temporal);
903            AbsoluteTimeRange::new(min_time, max_time)
904        } else {
905            // NOTE: Do the iteration multiple times in a cache-friendly way rather than the opposite.
906            // NOTE: The 'or' in 'unwrap_or' is never hit, but better safe than sorry.
907            let min_time = time_slice
908                .iter()
909                .min()
910                .copied()
911                .map_or(TimeInt::MIN, TimeInt::new_temporal);
912            let max_time = time_slice
913                .iter()
914                .max()
915                .copied()
916                .map_or(TimeInt::MAX, TimeInt::new_temporal);
917            AbsoluteTimeRange::new(min_time, max_time)
918        };
919
920        Self {
921            timeline,
922            times,
923            is_sorted,
924            time_range,
925        }
926    }
927
928    /// Creates a new [`TimeColumn`] of sequence type.
929    pub fn new_sequence(
930        name: impl Into<re_log_types::TimelineName>,
931        times: impl IntoIterator<Item = impl Into<i64>>,
932    ) -> Self {
933        let time_vec: Vec<_> = times.into_iter().map(|t| {
934            let t = t.into();
935            TimeInt::try_from(t)
936                .unwrap_or_else(|_| {
937                    re_log::error!(
938                illegal_value = t,
939                new_value = TimeInt::MIN.as_i64(),
940                "TimeColumn::new_sequence() called with illegal value - clamped to minimum legal value"
941            );
942                    TimeInt::MIN
943                })
944                .as_i64()
945        }).collect();
946
947        Self::new(
948            None,
949            Timeline::new_sequence(name.into()),
950            ArrowScalarBuffer::from(time_vec),
951        )
952    }
953
954    /// Creates a new [`TimeColumn`] of duration type, in seconds.
955    pub fn new_duration_secs(
956        name: impl Into<re_log_types::TimelineName>,
957        seconds: impl IntoIterator<Item = impl Into<f64>>,
958    ) -> Self {
959        let time_vec = seconds.into_iter().map(|seconds| {
960            let seconds = seconds.into();
961            let nanos = (1e9 * seconds).round();
962            let clamped = NonMinI64::saturating_from_i64(nanos as i64);
963            if clamped.get() as f64 != nanos {
964                re_log::warn!(
965                    illegal_value = nanos,
966                    new_value = clamped.get(),
967                    "TimeColumn::new_duration_secs() called with out-of-range value. Clamped to valid range."
968                );
969            }
970            clamped.get()
971        }).collect_vec();
972
973        Self::new(
974            None,
975            Timeline::new(name, TimeType::DurationNs),
976            ArrowScalarBuffer::from(time_vec),
977        )
978    }
979
980    /// Creates a new [`TimeColumn`] of duration type, in seconds.
981    pub fn new_timestamp_secs_since_epoch(
982        name: impl Into<re_log_types::TimelineName>,
983        seconds: impl IntoIterator<Item = impl Into<f64>>,
984    ) -> Self {
985        let time_vec = seconds.into_iter().map(|seconds| {
986            let seconds = seconds.into();
987            let nanos = (1e9 * seconds).round();
988            let clamped = NonMinI64::saturating_from_i64(nanos as i64);
989            if clamped.get() as f64 != nanos {
990                re_log::warn!(
991                    illegal_value = nanos,
992                    new_value = clamped.get(),
993                    "TimeColumn::new_timestamp_secs_since_epoch() called with out-of-range value. Clamped to valid range."
994                );
995            }
996            clamped.get()
997        }).collect_vec();
998
999        Self::new(
1000            None,
1001            Timeline::new(name, TimeType::TimestampNs),
1002            ArrowScalarBuffer::from(time_vec),
1003        )
1004    }
1005
1006    /// Creates a new [`TimeColumn`] of duration type, in seconds.
1007    #[deprecated = "Use `TimeColumn::new_duration_secs` or `new_timestamp_secs_since_epoch` instead"]
1008    pub fn new_seconds(
1009        name: impl Into<re_log_types::TimelineName>,
1010        seconds: impl IntoIterator<Item = impl Into<f64>>,
1011    ) -> Self {
1012        Self::new_duration_secs(name, seconds)
1013    }
1014
1015    /// Creates a new [`TimeColumn`] measuring duration in nanoseconds.
1016    pub fn new_duration_nanos(
1017        name: impl Into<re_log_types::TimelineName>,
1018        nanos: impl IntoIterator<Item = impl Into<i64>>,
1019    ) -> Self {
1020        let time_vec = nanos
1021            .into_iter()
1022            .map(|nanos| {
1023                let nanos = nanos.into();
1024                NonMinI64::new(nanos)
1025                    .unwrap_or_else(|| {
1026                        re_log::error!(
1027                            illegal_value = nanos,
1028                            new_value = TimeInt::MIN.as_i64(),
1029                            "TimeColumn::new_duration_nanos() called with illegal value - clamped to minimum legal value"
1030                        );
1031                        NonMinI64::MIN
1032                    })
1033                    .get()
1034            })
1035            .collect_vec();
1036
1037        Self::new(
1038            None,
1039            Timeline::new(name, TimeType::DurationNs),
1040            ArrowScalarBuffer::from(time_vec),
1041        )
1042    }
1043
1044    /// Creates a new [`TimeColumn`] of timestamps, as nanoseconds since unix epoch.
1045    pub fn new_timestamp_nanos_since_epoch(
1046        name: impl Into<re_log_types::TimelineName>,
1047        nanos: impl IntoIterator<Item = impl Into<i64>>,
1048    ) -> Self {
1049        let time_vec = nanos
1050            .into_iter()
1051            .map(|nanos| {
1052                let nanos = nanos.into();
1053                NonMinI64::new(nanos)
1054                    .unwrap_or_else(|| {
1055                        re_log::error!(
1056                            illegal_value = nanos,
1057                            new_value = TimeInt::MIN.as_i64(),
1058                            "TimeColumn::new_timestamp_nanos_since_epoch() called with illegal value - clamped to minimum legal value"
1059                        );
1060                        NonMinI64::MIN
1061                    })
1062                    .get()
1063            })
1064            .collect_vec();
1065
1066        Self::new(
1067            None,
1068            Timeline::new(name, TimeType::TimestampNs),
1069            ArrowScalarBuffer::from(time_vec),
1070        )
1071    }
1072
1073    /// Creates a new [`TimeColumn`] of nanoseconds type.
1074    #[deprecated = "Use `TimeColumn::new_duration_nanos` or `new_timestamp_nanos_since_epoch` instead"]
1075    pub fn new_nanos(
1076        name: impl Into<re_log_types::TimelineName>,
1077        nanos: impl IntoIterator<Item = impl Into<i64>>,
1078    ) -> Self {
1079        Self::new_duration_nanos(name, nanos)
1080    }
1081
1082    /// Parse the given [`ArrowArray`] as a time column.
1083    ///
1084    /// Results in an error if the array is of the wrong datatype, or if it contains nulls.
1085    pub fn read_array(array: &dyn ArrowArray) -> Result<ArrowScalarBuffer<i64>, TimeColumnError> {
1086        if array.null_count() > 0 {
1087            Err(TimeColumnError::ContainsNulls)
1088        } else {
1089            Self::read_nullable_array(array).map(|(times, _nulls)| times)
1090        }
1091    }
1092
1093    /// Parse the given [`ArrowArray`] as a time column where null values are acceptable.
1094    ///
1095    /// Results in an error if the array is of the wrong datatype.
1096    pub fn read_nullable_array(
1097        array: &dyn ArrowArray,
1098    ) -> Result<(ArrowScalarBuffer<i64>, Option<ArrowNullBuffer>), TimeColumnError> {
1099        // Sequence timelines are i64, but time columns are nanoseconds (also as i64).
1100        if let Some(times) = array.downcast_array_ref::<arrow::array::Int64Array>() {
1101            Ok((times.values().clone(), times.nulls().cloned()))
1102        } else if let Some(times) =
1103            array.downcast_array_ref::<arrow::array::TimestampNanosecondArray>()
1104        {
1105            Ok((times.values().clone(), times.nulls().cloned()))
1106        } else if let Some(times) =
1107            array.downcast_array_ref::<arrow::array::Time64NanosecondArray>()
1108        {
1109            Ok((times.values().clone(), times.nulls().cloned()))
1110        } else if let Some(times) =
1111            array.downcast_array_ref::<arrow::array::DurationNanosecondArray>()
1112        {
1113            Ok((times.values().clone(), times.nulls().cloned()))
1114        } else {
1115            Err(TimeColumnError::UnsupportedDataType(
1116                array.data_type().clone(),
1117            ))
1118        }
1119    }
1120}
1121
1122// ---
1123
1124impl Chunk {
1125    #[inline]
1126    pub fn id(&self) -> ChunkId {
1127        self.id
1128    }
1129
1130    #[inline]
1131    pub fn entity_path(&self) -> &EntityPath {
1132        &self.entity_path
1133    }
1134
1135    /// How many columns in total? Includes control, time, and component columns.
1136    #[inline]
1137    pub fn num_columns(&self) -> usize {
1138        let Self {
1139            id: _,
1140            entity_path: _, // not an actual column
1141            heap_size_bytes: _,
1142            is_sorted: _,
1143            row_ids: _,
1144            timelines,
1145            components,
1146        } = self;
1147
1148        1 /* row_ids */ + timelines.len() + components.len()
1149    }
1150
1151    #[inline]
1152    pub fn num_controls(&self) -> usize {
1153        _ = self;
1154        1 /* row_ids */
1155    }
1156
1157    #[inline]
1158    pub fn num_timelines(&self) -> usize {
1159        self.timelines.len()
1160    }
1161
1162    #[inline]
1163    pub fn num_components(&self) -> usize {
1164        self.components.len()
1165    }
1166
1167    #[inline]
1168    pub fn num_rows(&self) -> usize {
1169        self.row_ids.len()
1170    }
1171
1172    #[inline]
1173    pub fn is_empty(&self) -> bool {
1174        self.num_rows() == 0
1175    }
1176
1177    #[inline]
1178    pub fn row_ids_array(&self) -> &FixedSizeBinaryArray {
1179        &self.row_ids
1180    }
1181
1182    #[inline]
1183    pub fn row_ids_slice(&self) -> &[RowId] {
1184        RowId::slice_from_arrow(&self.row_ids)
1185    }
1186
1187    /// All the [`RowId`] in this chunk.
1188    ///
1189    /// This could be in any order if this chunk is unsorted.
1190    #[inline]
1191    pub fn row_ids(&self) -> impl ExactSizeIterator<Item = RowId> + '_ {
1192        self.row_ids_slice().iter().copied()
1193    }
1194
1195    /// Returns an iterator over the [`RowId`]s of a [`Chunk`], for a given component.
1196    ///
1197    /// This is different than [`Self::row_ids`]: it will only yield `RowId`s for rows at which
1198    /// there is data for the specified `component`.
1199    #[inline]
1200    pub fn component_row_ids(
1201        &self,
1202        component: ComponentIdentifier,
1203    ) -> impl Iterator<Item = RowId> + '_ + use<'_> {
1204        let Some(list_array) = self.components.get_array(component) else {
1205            return Either::Left(std::iter::empty());
1206        };
1207
1208        let row_ids = self.row_ids();
1209
1210        if let Some(validity) = list_array.nulls() {
1211            Either::Right(Either::Left(
1212                row_ids
1213                    .enumerate()
1214                    .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
1215            ))
1216        } else {
1217            Either::Right(Either::Right(row_ids))
1218        }
1219    }
1220
1221    /// Returns the [`RowId`]-range covered by this [`Chunk`].
1222    ///
1223    /// `None` if the chunk `is_empty`.
1224    ///
1225    /// This is O(1) if the chunk is sorted, O(n) otherwise.
1226    #[inline]
1227    pub fn row_id_range(&self) -> Option<(RowId, RowId)> {
1228        if self.is_empty() {
1229            return None;
1230        }
1231
1232        let row_ids = self.row_ids_slice();
1233
1234        #[expect(clippy::unwrap_used)] // checked above
1235        Some(if self.is_sorted() {
1236            (
1237                row_ids.first().copied().unwrap(),
1238                row_ids.last().copied().unwrap(),
1239            )
1240        } else {
1241            (
1242                row_ids.iter().min().copied().unwrap(),
1243                row_ids.iter().max().copied().unwrap(),
1244            )
1245        })
1246    }
1247
1248    #[inline]
1249    pub fn is_static(&self) -> bool {
1250        self.timelines.is_empty()
1251    }
1252
1253    #[inline]
1254    pub fn timelines(&self) -> &IntMap<TimelineName, TimeColumn> {
1255        &self.timelines
1256    }
1257
1258    #[inline]
1259    pub fn components_identifiers(&self) -> impl Iterator<Item = ComponentIdentifier> + '_ {
1260        self.components.keys().copied()
1261    }
1262
1263    #[inline]
1264    pub fn components(&self) -> &ChunkComponents {
1265        &self.components
1266    }
1267}
1268
1269impl std::fmt::Display for Chunk {
1270    #[inline]
1271    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1272        let batch = self.to_record_batch().map_err(|err| {
1273            re_log::error_once!("couldn't display Chunk: {err}");
1274            std::fmt::Error
1275        })?;
1276        re_arrow_util::format_record_batch_with_width(&batch, f.width(), f.sign_minus()).fmt(f)
1277    }
1278}
1279
1280impl TimeColumn {
1281    #[inline]
1282    pub fn timeline(&self) -> &Timeline {
1283        &self.timeline
1284    }
1285
1286    #[inline]
1287    pub fn name(&self) -> &str {
1288        self.timeline.name()
1289    }
1290
1291    #[inline]
1292    pub fn time_range(&self) -> AbsoluteTimeRange {
1293        self.time_range
1294    }
1295
1296    #[inline]
1297    pub fn times_buffer(&self) -> &ArrowScalarBuffer<i64> {
1298        &self.times
1299    }
1300
1301    /// Returns an array with the appropriate datatype.
1302    #[inline]
1303    pub fn times_array(&self) -> ArrowArrayRef {
1304        self.timeline.typ().make_arrow_array(self.times.clone())
1305    }
1306
1307    /// All times in a time column are guaranteed not to have the value `i64::MIN`
1308    /// (which is reserved for static data).
1309    #[inline]
1310    pub fn times_raw(&self) -> &[i64] {
1311        self.times.as_ref()
1312    }
1313
1314    /// All times in a time column are guaranteed not to have the value `i64::MIN`
1315    /// (which is reserved for static data).
1316    #[inline]
1317    pub fn times_nonmin(&self) -> impl DoubleEndedIterator<Item = NonMinI64> + '_ {
1318        self.times_raw()
1319            .iter()
1320            .copied()
1321            .map(NonMinI64::saturating_from_i64)
1322    }
1323
1324    #[inline]
1325    pub fn times(&self) -> impl DoubleEndedIterator<Item = TimeInt> + '_ {
1326        self.times_raw().iter().copied().map(TimeInt::new_temporal)
1327    }
1328
1329    #[inline]
1330    pub fn num_rows(&self) -> usize {
1331        self.times.len()
1332    }
1333
1334    #[inline]
1335    pub fn is_empty(&self) -> bool {
1336        self.num_rows() == 0
1337    }
1338
1339    /// Computes the time range covered by each individual component column.
1340    ///
1341    /// This is different from the time range covered by the [`TimeColumn`] as a whole
1342    /// because component columns are potentially sparse.
1343    ///
1344    /// This is crucial for indexing and queries to work properly.
1345    //
1346    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
1347    pub fn time_range_per_component(
1348        &self,
1349        components: &ChunkComponents,
1350    ) -> IntMap<ComponentIdentifier, AbsoluteTimeRange> {
1351        let times = self.times_raw();
1352        components
1353            .iter()
1354            .filter_map(|(component, column)| {
1355                if let Some(validity) = column.list_array.nulls() {
1356                    // Potentially sparse
1357
1358                    if validity.is_empty() {
1359                        return None;
1360                    }
1361
1362                    let is_dense = validity.null_count() == 0;
1363                    if is_dense {
1364                        return Some((*component, self.time_range));
1365                    }
1366
1367                    let mut time_min = TimeInt::MAX;
1368                    for (i, time) in times.iter().copied().enumerate() {
1369                        if validity.is_valid(i) {
1370                            time_min = TimeInt::new_temporal(time);
1371                            break;
1372                        }
1373                    }
1374
1375                    let mut time_max = TimeInt::MIN;
1376                    for (i, time) in times.iter().copied().enumerate().rev() {
1377                        if validity.is_valid(i) {
1378                            time_max = TimeInt::new_temporal(time);
1379                            break;
1380                        }
1381                    }
1382
1383                    Some((*component, AbsoluteTimeRange::new(time_min, time_max)))
1384                } else {
1385                    // Dense
1386
1387                    Some((*component, self.time_range))
1388                }
1389            })
1390            .collect()
1391    }
1392}
1393
1394impl re_byte_size::SizeBytes for Chunk {
1395    #[inline]
1396    fn heap_size_bytes(&self) -> u64 {
1397        let Self {
1398            id,
1399            entity_path,
1400            heap_size_bytes,
1401            is_sorted,
1402            row_ids,
1403            timelines,
1404            components,
1405        } = self;
1406
1407        let mut size_bytes = heap_size_bytes.load(Ordering::Relaxed);
1408
1409        if size_bytes == 0 {
1410            size_bytes = id.heap_size_bytes()
1411                + entity_path.heap_size_bytes()
1412                + is_sorted.heap_size_bytes()
1413                + row_ids.heap_size_bytes()
1414                + timelines.heap_size_bytes()
1415                + components.heap_size_bytes();
1416            heap_size_bytes.store(size_bytes, Ordering::Relaxed);
1417        }
1418
1419        size_bytes
1420    }
1421}
1422
1423impl re_byte_size::SizeBytes for TimeColumn {
1424    #[inline]
1425    fn heap_size_bytes(&self) -> u64 {
1426        let Self {
1427            timeline,
1428            times,
1429            is_sorted,
1430            time_range,
1431        } = self;
1432
1433        timeline.heap_size_bytes()
1434            + times.heap_size_bytes() // cheap
1435            + is_sorted.heap_size_bytes()
1436            + time_range.heap_size_bytes()
1437    }
1438}
1439
1440// --- Sanity checks ---
1441
1442impl Chunk {
1443    /// Returns an error if the Chunk's invariants are not upheld.
1444    ///
1445    /// Costly checks are only run in debug builds.
1446    #[track_caller]
1447    pub fn sanity_check(&self) -> ChunkResult<()> {
1448        re_tracing::profile_function!();
1449
1450        let Self {
1451            id: _,
1452            entity_path: _,
1453            heap_size_bytes,
1454            is_sorted,
1455            row_ids,
1456            timelines,
1457            components,
1458        } = self;
1459
1460        if cfg!(debug_assertions) {
1461            let measured = self.heap_size_bytes();
1462            let advertised = heap_size_bytes.load(Ordering::Relaxed);
1463            if advertised != measured {
1464                return Err(ChunkError::Malformed {
1465                    reason: format!(
1466                        "Chunk advertises a heap size of {} but we measure {} instead",
1467                        re_format::format_bytes(advertised as _),
1468                        re_format::format_bytes(measured as _),
1469                    ),
1470                });
1471            }
1472        }
1473
1474        // Row IDs
1475        {
1476            if *row_ids.data_type() != RowId::arrow_datatype() {
1477                return Err(ChunkError::Malformed {
1478                    reason: format!(
1479                        "RowId data has the wrong datatype: expected {:?} but got {:?} instead",
1480                        RowId::arrow_datatype(),
1481                        *row_ids.data_type(),
1482                    ),
1483                });
1484            }
1485
1486            #[expect(clippy::collapsible_if)] // readability
1487            if cfg!(debug_assertions) {
1488                if *is_sorted != self.is_sorted_uncached() {
1489                    return Err(ChunkError::Malformed {
1490                        reason: format!(
1491                            "Chunk is marked as {}sorted but isn't: {row_ids:?}",
1492                            if *is_sorted { "" } else { "un" },
1493                        ),
1494                    });
1495                }
1496            }
1497        }
1498
1499        // Timelines
1500        for (timeline_name, time_column) in timelines {
1501            if time_column.times.len() != row_ids.len() {
1502                return Err(ChunkError::Malformed {
1503                    reason: format!(
1504                        "All timelines in a chunk must have the same number of timestamps, matching the number of row IDs. \
1505                         Found {} row IDs but {} timestamps for timeline '{timeline_name}'",
1506                        row_ids.len(),
1507                        time_column.times.len(),
1508                    ),
1509                });
1510            }
1511
1512            time_column.sanity_check()?;
1513        }
1514
1515        // Components
1516
1517        for (component, column) in components.iter() {
1518            let SerializedComponentColumn {
1519                list_array,
1520                descriptor,
1521            } = column;
1522
1523            if descriptor.component != *component {
1524                return Err(ChunkError::Malformed {
1525                    reason: format!(
1526                        "Component key & descriptor mismatch. Descriptor: {descriptor:?}. Key: {component:?}",
1527                    ),
1528                });
1529            }
1530
1531            // Ensure that each cell is a list (we don't support mono-components yet).
1532            if let arrow::datatypes::DataType::List(_field) = list_array.data_type() {
1533                // We don't check `field.is_nullable()` here because we support both.
1534                // TODO(#6819): Remove support for inner nullability.
1535            } else {
1536                return Err(ChunkError::Malformed {
1537                    reason: format!(
1538                        "The inner array in a chunked component batch must be a list, got {:?}",
1539                        list_array.data_type(),
1540                    ),
1541                });
1542            }
1543
1544            if list_array.len() != row_ids.len() {
1545                return Err(ChunkError::Malformed {
1546                    reason: format!(
1547                        "All component batches in a chunk must have the same number of rows, matching the number of row IDs. \
1548                             Found {} row IDs but {} rows for component batch {component}",
1549                        row_ids.len(),
1550                        list_array.len(),
1551                    ),
1552                });
1553            }
1554
1555            let validity_is_empty = list_array
1556                .nulls()
1557                .is_some_and(|validity| validity.is_empty());
1558            if !self.is_empty() && validity_is_empty {
1559                return Err(ChunkError::Malformed {
1560                    reason: format!(
1561                        "All component batches in a chunk must contain at least one non-null entry.\
1562                             Found a completely empty column for {component}",
1563                    ),
1564                });
1565            }
1566        }
1567
1568        Ok(())
1569    }
1570}
1571
1572impl TimeColumn {
1573    /// Returns an error if the Chunk's invariants are not upheld.
1574    ///
1575    /// Costly checks are only run in debug builds.
1576    #[track_caller]
1577    pub fn sanity_check(&self) -> ChunkResult<()> {
1578        let Self {
1579            timeline: _,
1580            times,
1581            is_sorted,
1582            time_range,
1583        } = self;
1584
1585        let times = times.as_ref();
1586
1587        if cfg!(debug_assertions)
1588            && *is_sorted != times.windows(2).all(|times| times[0] <= times[1])
1589        {
1590            return Err(ChunkError::Malformed {
1591                reason: format!(
1592                    "Time column is marked as {}sorted but isn't: {times:?}",
1593                    if *is_sorted { "" } else { "un" },
1594                ),
1595            });
1596        }
1597
1598        if cfg!(debug_assertions) {
1599            let is_tight_lower_bound = times.iter().any(|&time| time == time_range.min().as_i64());
1600            let is_tight_upper_bound = times.iter().any(|&time| time == time_range.max().as_i64());
1601            let is_tight_bound = is_tight_lower_bound && is_tight_upper_bound;
1602
1603            if !self.is_empty() && !is_tight_bound {
1604                return Err(ChunkError::Malformed {
1605                    reason: "Time column's cached time range isn't a tight bound.".to_owned(),
1606                });
1607            }
1608
1609            for &time in times {
1610                if time < time_range.min().as_i64() || time > time_range.max().as_i64() {
1611                    return Err(ChunkError::Malformed {
1612                        reason: format!(
1613                            "Time column's cached time range is wrong.\
1614                             Found a time value of {time} while its time range is {time_range:?}",
1615                        ),
1616                    });
1617                }
1618
1619                if time == TimeInt::STATIC.as_i64() {
1620                    return Err(ChunkError::Malformed {
1621                        reason: "A chunk's timeline should never contain a static time value."
1622                            .to_owned(),
1623                    });
1624                }
1625            }
1626        }
1627
1628        Ok(())
1629    }
1630}