re_chunk/
chunk.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3use ahash::HashMap;
4use anyhow::Context as _;
5use arrow::{
6    array::{
7        Array as ArrowArray, ArrayRef as ArrowArrayRef, FixedSizeBinaryArray,
8        ListArray as ArrowListArray,
9    },
10    buffer::{NullBuffer as ArrowNullBuffer, ScalarBuffer as ArrowScalarBuffer},
11};
12use itertools::{Either, Itertools as _, izip};
13use nohash_hasher::IntMap;
14
15use re_arrow_util::{ArrowArrayDowncastRef as _, widen_binary_arrays};
16use re_byte_size::SizeBytes as _;
17use re_log_types::{
18    AbsoluteTimeRange, EntityPath, NonMinI64, TimeInt, TimeType, Timeline, TimelineName,
19};
20use re_types_core::{
21    ArchetypeName, ComponentDescriptor, ComponentType, DeserializationError, Loggable as _,
22    SerializationError, SerializedComponentColumn,
23};
24
25use crate::{ChunkId, RowId};
26
27// ---
28
29/// Errors that can occur when creating/manipulating a [`Chunk`]s, directly or indirectly through
30/// the use of a [`crate::ChunkBatcher`].
31#[derive(thiserror::Error, Debug)]
32pub enum ChunkError {
33    #[error("Detected malformed Chunk: {reason}")]
34    Malformed { reason: String },
35
36    #[error("Arrow: {0}")]
37    Arrow(#[from] arrow::error::ArrowError),
38
39    #[error("{kind} index out of bounds: {index} (len={len})")]
40    IndexOutOfBounds {
41        kind: String,
42        len: usize,
43        index: usize,
44    },
45
46    #[error("Serialization: {0}")]
47    Serialization(#[from] SerializationError),
48
49    #[error("Deserialization: {0}")]
50    Deserialization(#[from] DeserializationError),
51
52    #[error(transparent)]
53    UnsupportedTimeType(#[from] re_sorbet::UnsupportedTimeType),
54
55    #[error(transparent)]
56    WrongDatatypeError(#[from] re_sorbet::WrongDatatypeError),
57
58    #[error(transparent)]
59    MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError),
60
61    #[error(transparent)]
62    InvalidSorbetSchema(#[from] re_sorbet::SorbetError),
63}
64
65const _: () = assert!(
66    std::mem::size_of::<ChunkError>() <= 72,
67    "Error type is too large. Try to reduce its size by boxing some of its variants.",
68);
69
70pub type ChunkResult<T> = Result<T, ChunkError>;
71
72// ---
73
74#[derive(Debug, Clone, Default, PartialEq)]
75pub struct ChunkComponents(pub IntMap<ComponentDescriptor, ArrowListArray>);
76
77impl ChunkComponents {
78    /// Returns all list arrays for the given component type.
79    ///
80    /// I.e semantically equivalent to `get("MyComponent:*.*")`
81    #[inline]
82    pub fn get_by_component_type(
83        &self,
84        component_type: ComponentType,
85    ) -> impl Iterator<Item = &ArrowListArray> {
86        self.0.iter().filter_map(move |(desc, array)| {
87            (desc.component_type == Some(component_type)).then_some(array)
88        })
89    }
90
91    /// Approximate equal, that ignores small numeric differences.
92    ///
93    /// Returns `Ok` if similar.
94    /// If there is a difference, a description of that difference is returned in `Err`.
95    /// We use [`anyhow`] to provide context.
96    ///
97    /// Useful for tests.
98    pub fn ensure_similar(left: &Self, right: &Self) -> anyhow::Result<()> {
99        anyhow::ensure!(left.len() == right.len());
100        for (descr, left_array) in left.iter() {
101            let Some(right_array) = right.get(descr) else {
102                anyhow::bail!("rhs is missing {descr:?}");
103            };
104            let left_array = widen_binary_arrays(left_array);
105            let right_array = widen_binary_arrays(right_array);
106            re_arrow_util::ensure_similar(&left_array.to_data(), &right_array.to_data())
107                .with_context(|| format!("Component {descr:?}"))?;
108        }
109        Ok(())
110    }
111
112    /// Whether any of the components in this chunk has the given name.
113    pub fn contains_component(&self, component_descr: &ComponentDescriptor) -> bool {
114        self.0.contains_key(component_descr)
115    }
116
117    /// Whether any of the components in this chunk is tagged with the given archetype name.
118    pub fn has_component_with_archetype(&self, archetype_name: ArchetypeName) -> bool {
119        self.0
120            .keys()
121            .any(|desc| desc.archetype == Some(archetype_name))
122    }
123}
124
125impl std::ops::Deref for ChunkComponents {
126    type Target = IntMap<ComponentDescriptor, ArrowListArray>;
127
128    #[inline]
129    fn deref(&self) -> &Self::Target {
130        &self.0
131    }
132}
133
134impl std::ops::DerefMut for ChunkComponents {
135    #[inline]
136    fn deref_mut(&mut self) -> &mut Self::Target {
137        &mut self.0
138    }
139}
140
141impl FromIterator<(ComponentDescriptor, ArrowListArray)> for ChunkComponents {
142    #[inline]
143    fn from_iter<T: IntoIterator<Item = (ComponentDescriptor, ArrowListArray)>>(iter: T) -> Self {
144        let mut this = Self::default();
145        {
146            for (component_desc, list_array) in iter {
147                this.insert(component_desc, list_array);
148            }
149        }
150        this
151    }
152}
153
154impl FromIterator<SerializedComponentColumn> for ChunkComponents {
155    #[inline]
156    fn from_iter<T: IntoIterator<Item = SerializedComponentColumn>>(iter: T) -> Self {
157        let mut this = Self::default();
158        {
159            for serialized in iter {
160                this.insert(serialized.descriptor, serialized.list_array);
161            }
162        }
163        this
164    }
165}
166
167/// Dense arrow-based storage of N rows of multi-component multi-temporal data for a specific entity.
168///
169/// This is our core datastructure for logging, storing, querying and transporting data around.
170///
171/// The chunk as a whole is always ascendingly sorted by [`RowId`] before it gets manipulated in any way.
172/// Its time columns might or might not be ascendingly sorted, depending on how the data was logged.
173///
174/// This is the in-memory representation of a chunk, optimized for efficient manipulation of the
175/// data within. For transport, see [`re_sorbet::ChunkBatch`] instead.
176#[derive(Debug)]
177pub struct Chunk {
178    pub(crate) id: ChunkId,
179
180    pub(crate) entity_path: EntityPath,
181
182    /// The heap size of this chunk in bytes.
183    ///
184    /// Must be cached as it is very costly to compute, and needs to be computed repeatedly on the
185    /// hot path (e.g. during garbage collection).
186    pub(crate) heap_size_bytes: AtomicU64,
187
188    /// Is the chunk as a whole sorted by [`RowId`]?
189    pub(crate) is_sorted: bool,
190
191    /// The respective [`RowId`]s for each row of data.
192    pub(crate) row_ids: FixedSizeBinaryArray,
193
194    /// The time columns.
195    ///
196    /// Each column must be the same length as `row_ids`.
197    ///
198    /// Empty if this is a static chunk.
199    pub(crate) timelines: IntMap<TimelineName, TimeColumn>,
200
201    /// A sparse `ListArray` for each component.
202    ///
203    /// Each `ListArray` must be the same length as `row_ids`.
204    ///
205    /// Sparse so that we can e.g. log a `Position` at one timestamp but not a `Color`.
206    pub(crate) components: ChunkComponents,
207}
208
209impl PartialEq for Chunk {
210    #[inline]
211    fn eq(&self, other: &Self) -> bool {
212        let Self {
213            id,
214            entity_path,
215            heap_size_bytes: _,
216            is_sorted,
217            row_ids,
218            timelines,
219            components,
220        } = self;
221
222        *id == other.id
223            && *entity_path == other.entity_path
224            && *is_sorted == other.is_sorted
225            && *row_ids == other.row_ids
226            && *timelines == other.timelines
227            && *components == other.components
228    }
229}
230
231impl Chunk {
232    /// Returns a version of us with a new [`ChunkId`].
233    ///
234    /// Reminder:
235    /// * The returned [`Chunk`] will re-use the exact same [`RowId`]s as `self`.
236    /// * Duplicated [`RowId`]s in the `ChunkStore` is undefined behavior.
237    #[must_use]
238    #[inline]
239    pub fn with_id(mut self, id: ChunkId) -> Self {
240        self.id = id;
241        self
242    }
243
244    /// Returns `Ok` if two [`Chunk`]s are _similar_, although not byte-for-byte equal.
245    ///
246    /// In particular, this ignores chunks and row IDs, as well as `log_time` timestamps.
247    /// It also forgives small numeric inaccuracies in floating point buffers.
248    ///
249    /// If there is a difference, a description of that difference is returned in `Err`.
250    /// We use [`anyhow`] to provide context.
251    ///
252    /// Useful for tests.
253    pub fn ensure_similar(lhs: &Self, rhs: &Self) -> anyhow::Result<()> {
254        let Self {
255            id: _,
256            entity_path,
257            heap_size_bytes: _,
258            is_sorted: _,
259            row_ids: _,
260            timelines,
261            components,
262        } = lhs;
263
264        anyhow::ensure!(*entity_path == rhs.entity_path);
265
266        anyhow::ensure!(timelines.keys().collect_vec() == rhs.timelines.keys().collect_vec());
267
268        for (timeline, left_time_col) in timelines {
269            let right_time_col = rhs
270                .timelines
271                .get(timeline)
272                .ok_or_else(|| anyhow::format_err!("right is missing timeline {timeline:?}"))?;
273            if timeline == &TimelineName::log_time() {
274                continue; // We expect this to differ
275            }
276            if timeline == "sim_time" {
277                continue; // Small numeric differences
278            }
279            anyhow::ensure!(
280                left_time_col == right_time_col,
281                "Timeline differs: {timeline:?}"
282            );
283        }
284
285        // Handle edge case: recording time on partition properties should ignore start time.
286        if entity_path == &EntityPath::properties() {
287            // We're going to filter out some components on both lhs and rhs.
288            // Therefore, it's important that we first check that the number of components is the same.
289            anyhow::ensure!(components.len() == rhs.components.len());
290
291            // Copied from `rerun.archetypes.RecordingInfo`.
292            let recording_time_descriptor = ComponentDescriptor {
293                archetype: Some("rerun.archetypes.RecordingInfo".into()),
294                component: "RecordingInfo:start_time".into(),
295                component_type: Some("rerun.components.Timestamp".into()),
296            };
297
298            // Filter out the recording time component from both lhs and rhs.
299            let lhs_components = components
300                .iter()
301                .filter(|&(desc, _list_array)| desc != &recording_time_descriptor)
302                .map(|(desc, list_array)| (desc.clone(), list_array.clone()))
303                .collect::<IntMap<_, _>>();
304            let rhs_components = rhs
305                .components
306                .iter()
307                .filter(|&(desc, _list_array)| desc != &recording_time_descriptor)
308                .map(|(desc, list_array)| (desc.clone(), list_array.clone()))
309                .collect::<IntMap<_, _>>();
310
311            anyhow::ensure!(lhs_components == rhs_components);
312            Ok(())
313        } else {
314            ChunkComponents::ensure_similar(components, &rhs.components)
315        }
316    }
317
318    // Only used for tests atm
319    pub fn are_equal(&self, other: &Self) -> bool {
320        let Self {
321            id,
322            entity_path,
323            heap_size_bytes: _,
324            is_sorted,
325            row_ids,
326            timelines,
327            components,
328        } = self;
329
330        let my_components: IntMap<_, _> = components
331            .iter()
332            .map(|(descr, list_array)| (descr.clone(), list_array))
333            .collect();
334
335        let other_components: IntMap<_, _> = other
336            .components
337            .iter()
338            .map(|(descr, list_array)| (descr.clone(), list_array))
339            .collect();
340
341        *id == other.id
342            && *entity_path == other.entity_path
343            && *is_sorted == other.is_sorted
344            && row_ids == &other.row_ids
345            && *timelines == other.timelines
346            && my_components == other_components
347    }
348}
349
350impl Clone for Chunk {
351    #[inline]
352    fn clone(&self) -> Self {
353        Self {
354            id: self.id,
355            entity_path: self.entity_path.clone(),
356            heap_size_bytes: AtomicU64::new(self.heap_size_bytes.load(Ordering::Relaxed)),
357            is_sorted: self.is_sorted,
358            row_ids: self.row_ids.clone(),
359            timelines: self.timelines.clone(),
360            components: self.components.clone(),
361        }
362    }
363}
364
365impl Chunk {
366    /// Clones the chunk and assign new IDs to the resulting chunk and its rows.
367    ///
368    /// `first_row_id` will become the [`RowId`] of the first row in the duplicated chunk.
369    /// Each row after that will be monotonically increasing.
370    #[inline]
371    pub fn clone_as(&self, id: ChunkId, first_row_id: RowId) -> Self {
372        let row_ids = std::iter::from_fn({
373            let mut row_id = first_row_id;
374            move || {
375                let yielded = row_id;
376                row_id = row_id.next();
377                Some(yielded)
378            }
379        })
380        .take(self.row_ids.len())
381        .collect_vec();
382
383        Self {
384            id,
385            row_ids: RowId::arrow_from_slice(&row_ids),
386            ..self.clone()
387        }
388    }
389
390    /// Clones the chunk into a new chunk without any time data.
391    #[inline]
392    pub fn into_static(mut self) -> Self {
393        self.timelines.clear();
394        self
395    }
396
397    /// Clones the chunk into a new chunk where all [`RowId`]s are [`RowId::ZERO`].
398    pub fn zeroed(self) -> Self {
399        let row_ids = vec![RowId::ZERO; self.row_ids.len()];
400
401        let row_ids = RowId::arrow_from_slice(&row_ids);
402
403        Self { row_ids, ..self }
404    }
405
406    /// Computes the time range covered by each individual component column on each timeline.
407    ///
408    /// This is different from the time range covered by the [`Chunk`] as a whole because component
409    /// columns are potentially sparse.
410    ///
411    /// This is crucial for indexing and queries to work properly.
412    //
413    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
414    #[inline]
415    pub fn time_range_per_component(
416        &self,
417    ) -> IntMap<TimelineName, IntMap<ComponentDescriptor, AbsoluteTimeRange>> {
418        re_tracing::profile_function!();
419
420        self.timelines
421            .iter()
422            .map(|(timeline_name, time_column)| {
423                (
424                    *timeline_name,
425                    time_column.time_range_per_component(&self.components),
426                )
427            })
428            .collect()
429    }
430
431    /// The cumulative number of events in this chunk.
432    ///
433    /// I.e. how many _component batches_ ("cells") were logged in total?
434    //
435    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
436    #[inline]
437    pub fn num_events_cumulative(&self) -> u64 {
438        // Reminder: component columns are sparse, we must take a look at the validity bitmaps.
439        self.components
440            .values()
441            .map(|list_array| {
442                list_array.nulls().map_or_else(
443                    || list_array.len() as u64,
444                    |validity| validity.len() as u64 - validity.null_count() as u64,
445                )
446            })
447            .sum()
448    }
449
450    /// The cumulative number of events in this chunk for each _unique_ timestamp.
451    ///
452    /// I.e. how many _component batches_ ("cells") were logged in total at each timestamp?
453    ///
454    /// Keep in mind that a timestamp can appear multiple times in a [`Chunk`].
455    /// This method will do a sum accumulation to account for these cases (i.e. every timestamp in
456    /// the returned vector is guaranteed to be unique).
457    pub fn num_events_cumulative_per_unique_time(
458        &self,
459        timeline: &TimelineName,
460    ) -> Vec<(TimeInt, u64)> {
461        re_tracing::profile_function!();
462
463        if self.is_static() {
464            return vec![(TimeInt::STATIC, self.num_events_cumulative())];
465        }
466
467        let Some(time_column) = self.timelines().get(timeline) else {
468            return Vec::new();
469        };
470
471        let time_range = time_column.time_range();
472        if time_range.min() == time_range.max() {
473            return vec![(time_range.min(), self.num_events_cumulative())];
474        }
475
476        let counts = if time_column.is_sorted() {
477            self.num_events_cumulative_per_unique_time_sorted(time_column)
478        } else {
479            self.num_events_cumulative_per_unique_time_unsorted(time_column)
480        };
481
482        debug_assert!(
483            counts
484                .iter()
485                .tuple_windows::<(_, _)>()
486                .all(|((time1, _), (time2, _))| time1 < time2)
487        );
488
489        counts
490    }
491
492    fn num_events_cumulative_per_unique_time_sorted(
493        &self,
494        time_column: &TimeColumn,
495    ) -> Vec<(TimeInt, u64)> {
496        re_tracing::profile_function!();
497
498        debug_assert!(time_column.is_sorted());
499
500        // NOTE: This is used on some very hot paths (time panel rendering).
501        // Performance trumps readability. Optimized empirically.
502
503        // Raw, potentially duplicated counts (because timestamps aren't necessarily unique).
504        let mut counts_raw = vec![0u64; self.num_rows()];
505        {
506            self.components.values().for_each(|list_array| {
507                if let Some(validity) = list_array.nulls() {
508                    validity
509                        .iter()
510                        .enumerate()
511                        .for_each(|(i, is_valid)| counts_raw[i] += is_valid as u64);
512                } else {
513                    for count in &mut counts_raw {
514                        *count += 1;
515                    }
516                }
517            });
518        }
519
520        let mut counts = Vec::with_capacity(counts_raw.len());
521
522        let Some(mut cur_time) = time_column.times().next() else {
523            return Vec::new();
524        };
525        let mut cur_count = 0;
526        izip!(time_column.times(), counts_raw).for_each(|(time, count)| {
527            if time == cur_time {
528                cur_count += count;
529            } else {
530                counts.push((cur_time, cur_count));
531                cur_count = count;
532                cur_time = time;
533            }
534        });
535
536        if counts.last().map(|(time, _)| *time) != Some(cur_time) {
537            counts.push((cur_time, cur_count));
538        }
539
540        counts
541    }
542
543    fn num_events_cumulative_per_unique_time_unsorted(
544        &self,
545        time_column: &TimeColumn,
546    ) -> Vec<(TimeInt, u64)> {
547        re_tracing::profile_function!();
548
549        debug_assert!(!time_column.is_sorted());
550
551        // NOTE: This is used on some very hot paths (time panel rendering).
552
553        let result_unordered =
554            self.components
555                .values()
556                .fold(HashMap::default(), |acc, list_array| {
557                    if let Some(validity) = list_array.nulls() {
558                        time_column.times().zip(validity.iter()).fold(
559                            acc,
560                            |mut acc, (time, is_valid)| {
561                                *acc.entry(time).or_default() += is_valid as u64;
562                                acc
563                            },
564                        )
565                    } else {
566                        time_column.times().fold(acc, |mut acc, time| {
567                            *acc.entry(time).or_default() += 1;
568                            acc
569                        })
570                    }
571                });
572
573        let mut result = result_unordered.into_iter().collect_vec();
574        result.sort_by_key(|val| val.0);
575        result
576    }
577
578    /// The number of events in this chunk for the specified component.
579    ///
580    /// I.e. how many _component batches_ ("cells") were logged in total for this component?
581    //
582    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
583    #[inline]
584    pub fn num_events_for_component(
585        &self,
586        component_descriptor: &ComponentDescriptor,
587    ) -> Option<u64> {
588        // Reminder: component columns are sparse, we must check validity bitmap.
589        self.components.get(component_descriptor).map(|list_array| {
590            list_array.nulls().map_or_else(
591                || list_array.len() as u64,
592                |validity| validity.len() as u64 - validity.null_count() as u64,
593            )
594        })
595    }
596
597    /// Computes the `RowId` range covered by each individual component column on each timeline.
598    ///
599    /// This is different from the `RowId` range covered by the [`Chunk`] as a whole because component
600    /// columns are potentially sparse.
601    ///
602    /// This is crucial for indexing and queries to work properly.
603    //
604    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
605    pub fn row_id_range_per_component(&self) -> IntMap<ComponentDescriptor, (RowId, RowId)> {
606        re_tracing::profile_function!();
607
608        let row_ids = self.row_ids().collect_vec();
609
610        if self.is_sorted() {
611            self.components
612                .iter()
613                .filter_map(|(component_desc, list_array)| {
614                    let mut row_id_min = None;
615                    let mut row_id_max = None;
616
617                    for (i, &row_id) in row_ids.iter().enumerate() {
618                        if list_array.is_valid(i) {
619                            row_id_min = Some(row_id);
620                        }
621                    }
622                    for (i, &row_id) in row_ids.iter().enumerate().rev() {
623                        if list_array.is_valid(i) {
624                            row_id_max = Some(row_id);
625                        }
626                    }
627
628                    Some((component_desc.clone(), (row_id_min?, row_id_max?)))
629                })
630                .collect()
631        } else {
632            self.components
633                .iter()
634                .filter_map(|(component_desc, list_array)| {
635                    let mut row_id_min = Some(RowId::MAX);
636                    let mut row_id_max = Some(RowId::ZERO);
637
638                    for (i, &row_id) in row_ids.iter().enumerate() {
639                        if list_array.is_valid(i) && Some(row_id) > row_id_min {
640                            row_id_min = Some(row_id);
641                        }
642                    }
643                    for (i, &row_id) in row_ids.iter().enumerate().rev() {
644                        if list_array.is_valid(i) && Some(row_id) < row_id_max {
645                            row_id_max = Some(row_id);
646                        }
647                    }
648
649                    Some((component_desc.clone(), (row_id_min?, row_id_max?)))
650                })
651                .collect()
652        }
653    }
654}
655
656// ---
657
658#[derive(Debug, Clone, PartialEq, Eq)]
659pub struct TimeColumn {
660    pub(crate) timeline: Timeline,
661
662    /// Every single timestamp for this timeline.
663    ///
664    /// * This might or might not be sorted, depending on how the data was logged.
665    /// * This is guaranteed to always be dense, because chunks are split anytime a timeline is
666    ///   added or removed.
667    /// * This cannot ever contain `TimeInt::STATIC`, since static data doesn't even have timelines.
668    ///
669    /// When this buffer is converted to an arrow array, it's datatype will depend
670    /// on the timeline type, so it will either become a
671    /// [`arrow::array::Int64Array`] or a [`arrow::array::TimestampNanosecondArray`].
672    pub(crate) times: ArrowScalarBuffer<i64>,
673
674    /// Is [`Self::times`] sorted?
675    ///
676    /// This is completely independent of [`Chunk::is_sorted`]: a timeline doesn't necessarily
677    /// follow the global [`RowId`]-based order, although it does in most cases (happy path).
678    pub(crate) is_sorted: bool,
679
680    /// The time range covered by [`Self::times`].
681    ///
682    /// Not necessarily contiguous! Just the min and max value found in [`Self::times`].
683    pub(crate) time_range: AbsoluteTimeRange,
684}
685
686/// Errors when deserializing/parsing/reading a column of time data.
687#[derive(Debug, thiserror::Error)]
688pub enum TimeColumnError {
689    #[error("Time columns had nulls, but should be dense")]
690    ContainsNulls,
691
692    #[error("Unsupported data type : {0}")]
693    UnsupportedDataType(arrow::datatypes::DataType),
694}
695
696impl Chunk {
697    /// Creates a new [`Chunk`].
698    ///
699    /// This will fail if the passed in data is malformed in any way -- see [`Self::sanity_check`]
700    /// for details.
701    ///
702    /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`.
703    /// When left unspecified (`None`), it will be computed in O(n) time.
704    ///
705    /// For a row-oriented constructor, see [`Self::builder`].
706    pub fn new(
707        id: ChunkId,
708        entity_path: EntityPath,
709        is_sorted: Option<bool>,
710        row_ids: FixedSizeBinaryArray,
711        timelines: IntMap<TimelineName, TimeColumn>,
712        components: ChunkComponents,
713    ) -> ChunkResult<Self> {
714        let mut chunk = Self {
715            id,
716            entity_path,
717            heap_size_bytes: AtomicU64::new(0),
718            is_sorted: false,
719            row_ids,
720            timelines,
721            components,
722        };
723
724        chunk.is_sorted = is_sorted.unwrap_or_else(|| chunk.is_sorted_uncached());
725
726        chunk.sanity_check()?;
727
728        Ok(chunk)
729    }
730
731    /// Creates a new [`Chunk`].
732    ///
733    /// This will fail if the passed in data is malformed in any way -- see [`Self::sanity_check`]
734    /// for details.
735    ///
736    /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`.
737    /// When left unspecified (`None`), it will be computed in O(n) time.
738    ///
739    /// For a row-oriented constructor, see [`Self::builder`].
740    pub fn from_native_row_ids(
741        id: ChunkId,
742        entity_path: EntityPath,
743        is_sorted: Option<bool>,
744        row_ids: &[RowId],
745        timelines: IntMap<TimelineName, TimeColumn>,
746        components: ChunkComponents,
747    ) -> ChunkResult<Self> {
748        re_tracing::profile_function!();
749        let row_ids = RowId::arrow_from_slice(row_ids);
750        Self::new(id, entity_path, is_sorted, row_ids, timelines, components)
751    }
752
753    /// Creates a new [`Chunk`].
754    ///
755    /// This will fail if the passed in data is malformed in any way -- see [`Self::sanity_check`]
756    /// for details.
757    ///
758    /// The data is assumed to be sorted in `RowId`-order. Sequential `RowId`s will be generated for each
759    /// row in the chunk.
760    pub fn from_auto_row_ids(
761        id: ChunkId,
762        entity_path: EntityPath,
763        timelines: IntMap<TimelineName, TimeColumn>,
764        components: ChunkComponents,
765    ) -> ChunkResult<Self> {
766        let count = components
767            .iter()
768            .next()
769            .map_or(0, |(_, list_array)| list_array.len());
770
771        let row_ids = std::iter::from_fn({
772            let tuid: re_tuid::Tuid = *id;
773            let mut row_id = RowId::from_tuid(tuid.next());
774            move || {
775                let yielded = row_id;
776                row_id = row_id.next();
777                Some(yielded)
778            }
779        })
780        .take(count)
781        .collect_vec();
782
783        Self::from_native_row_ids(id, entity_path, Some(true), &row_ids, timelines, components)
784    }
785
786    /// Simple helper for [`Self::new`] for static data.
787    ///
788    /// For a row-oriented constructor, see [`Self::builder`].
789    #[inline]
790    pub fn new_static(
791        id: ChunkId,
792        entity_path: EntityPath,
793        is_sorted: Option<bool>,
794        row_ids: FixedSizeBinaryArray,
795        components: ChunkComponents,
796    ) -> ChunkResult<Self> {
797        Self::new(
798            id,
799            entity_path,
800            is_sorted,
801            row_ids,
802            Default::default(),
803            components,
804        )
805    }
806
807    #[inline]
808    pub fn empty(id: ChunkId, entity_path: EntityPath) -> Self {
809        Self {
810            id,
811            entity_path,
812            heap_size_bytes: Default::default(),
813            is_sorted: true,
814            row_ids: RowId::arrow_from_slice(&[]),
815            timelines: Default::default(),
816            components: Default::default(),
817        }
818    }
819
820    /// Unconditionally inserts an [`ArrowListArray`] as a component column.
821    ///
822    /// Removes and replaces the column if it already exists.
823    ///
824    /// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`].
825    #[inline]
826    pub fn add_component(
827        &mut self,
828        component_desc: ComponentDescriptor,
829        list_array: ArrowListArray,
830    ) -> ChunkResult<()> {
831        self.components.insert(component_desc, list_array);
832        self.sanity_check()
833    }
834
835    /// Unconditionally inserts a [`TimeColumn`].
836    ///
837    /// Removes and replaces the column if it already exists.
838    ///
839    /// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`].
840    #[inline]
841    pub fn add_timeline(&mut self, chunk_timeline: TimeColumn) -> ChunkResult<()> {
842        self.timelines
843            .insert(*chunk_timeline.timeline.name(), chunk_timeline);
844        self.sanity_check()
845    }
846}
847
848impl TimeColumn {
849    /// Creates a new [`TimeColumn`].
850    ///
851    /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`.
852    /// When left unspecified (`None`), it will be computed in O(n) time.
853    ///
854    /// For a row-oriented constructor, see [`Self::builder`].
855    pub fn new(is_sorted: Option<bool>, timeline: Timeline, times: ArrowScalarBuffer<i64>) -> Self {
856        re_tracing::profile_function_if!(1000 < times.len(), format!("{} times", times.len()));
857
858        let time_slice = times.as_ref();
859
860        let is_sorted =
861            is_sorted.unwrap_or_else(|| time_slice.windows(2).all(|times| times[0] <= times[1]));
862
863        let time_range = if is_sorted {
864            // NOTE: The 'or' in 'map_or' is never hit, but better safe than sorry.
865            let min_time = time_slice
866                .first()
867                .copied()
868                .map_or(TimeInt::MIN, TimeInt::new_temporal);
869            let max_time = time_slice
870                .last()
871                .copied()
872                .map_or(TimeInt::MAX, TimeInt::new_temporal);
873            AbsoluteTimeRange::new(min_time, max_time)
874        } else {
875            // NOTE: Do the iteration multiple times in a cache-friendly way rather than the opposite.
876            // NOTE: The 'or' in 'unwrap_or' is never hit, but better safe than sorry.
877            let min_time = time_slice
878                .iter()
879                .min()
880                .copied()
881                .map_or(TimeInt::MIN, TimeInt::new_temporal);
882            let max_time = time_slice
883                .iter()
884                .max()
885                .copied()
886                .map_or(TimeInt::MAX, TimeInt::new_temporal);
887            AbsoluteTimeRange::new(min_time, max_time)
888        };
889
890        Self {
891            timeline,
892            times,
893            is_sorted,
894            time_range,
895        }
896    }
897
898    /// Creates a new [`TimeColumn`] of sequence type.
899    pub fn new_sequence(
900        name: impl Into<re_log_types::TimelineName>,
901        times: impl IntoIterator<Item = impl Into<i64>>,
902    ) -> Self {
903        let time_vec: Vec<_> = times.into_iter().map(|t| {
904            let t = t.into();
905            TimeInt::try_from(t)
906                .unwrap_or_else(|_| {
907                    re_log::error!(
908                illegal_value = t,
909                new_value = TimeInt::MIN.as_i64(),
910                "TimeColumn::new_sequence() called with illegal value - clamped to minimum legal value"
911            );
912                    TimeInt::MIN
913                })
914                .as_i64()
915        }).collect();
916
917        Self::new(
918            None,
919            Timeline::new_sequence(name.into()),
920            ArrowScalarBuffer::from(time_vec),
921        )
922    }
923
924    /// Creates a new [`TimeColumn`] of duration type, in seconds.
925    pub fn new_duration_secs(
926        name: impl Into<re_log_types::TimelineName>,
927        seconds: impl IntoIterator<Item = impl Into<f64>>,
928    ) -> Self {
929        let time_vec = seconds.into_iter().map(|seconds| {
930            let seconds = seconds.into();
931            let nanos = (1e9 * seconds).round();
932            let clamped = NonMinI64::saturating_from_i64(nanos as i64);
933            if clamped.get() as f64 != nanos {
934                re_log::warn!(
935                    illegal_value = nanos,
936                    new_value = clamped.get(),
937                    "TimeColumn::new_duration_secs() called with out-of-range value. Clamped to valid range."
938                );
939            }
940            clamped.get()
941        }).collect_vec();
942
943        Self::new(
944            None,
945            Timeline::new(name, TimeType::DurationNs),
946            ArrowScalarBuffer::from(time_vec),
947        )
948    }
949
950    /// Creates a new [`TimeColumn`] of duration type, in seconds.
951    pub fn new_timestamp_secs_since_epoch(
952        name: impl Into<re_log_types::TimelineName>,
953        seconds: impl IntoIterator<Item = impl Into<f64>>,
954    ) -> Self {
955        let time_vec = seconds.into_iter().map(|seconds| {
956            let seconds = seconds.into();
957            let nanos = (1e9 * seconds).round();
958            let clamped = NonMinI64::saturating_from_i64(nanos as i64);
959            if clamped.get() as f64 != nanos {
960                re_log::warn!(
961                    illegal_value = nanos,
962                    new_value = clamped.get(),
963                    "TimeColumn::new_timestamp_secs_since_epoch() called with out-of-range value. Clamped to valid range."
964                );
965            }
966            clamped.get()
967        }).collect_vec();
968
969        Self::new(
970            None,
971            Timeline::new(name, TimeType::TimestampNs),
972            ArrowScalarBuffer::from(time_vec),
973        )
974    }
975
976    /// Creates a new [`TimeColumn`] of duration type, in seconds.
977    #[deprecated = "Use `TimeColumn::new_duration_secs` or `new_timestamp_secs_since_epoch` instead"]
978    pub fn new_seconds(
979        name: impl Into<re_log_types::TimelineName>,
980        seconds: impl IntoIterator<Item = impl Into<f64>>,
981    ) -> Self {
982        Self::new_duration_secs(name, seconds)
983    }
984
985    /// Creates a new [`TimeColumn`] measuring duration in nanoseconds.
986    pub fn new_duration_nanos(
987        name: impl Into<re_log_types::TimelineName>,
988        nanos: impl IntoIterator<Item = impl Into<i64>>,
989    ) -> Self {
990        let time_vec = nanos
991            .into_iter()
992            .map(|nanos| {
993                let nanos = nanos.into();
994                NonMinI64::new(nanos)
995                    .unwrap_or_else(|| {
996                        re_log::error!(
997                            illegal_value = nanos,
998                            new_value = TimeInt::MIN.as_i64(),
999                            "TimeColumn::new_duration_nanos() called with illegal value - clamped to minimum legal value"
1000                        );
1001                        NonMinI64::MIN
1002                    })
1003                    .get()
1004            })
1005            .collect_vec();
1006
1007        Self::new(
1008            None,
1009            Timeline::new(name, TimeType::DurationNs),
1010            ArrowScalarBuffer::from(time_vec),
1011        )
1012    }
1013
1014    /// Creates a new [`TimeColumn`] of timestamps, as nanoseconds since unix epoch.
1015    pub fn new_timestamp_nanos_since_epoch(
1016        name: impl Into<re_log_types::TimelineName>,
1017        nanos: impl IntoIterator<Item = impl Into<i64>>,
1018    ) -> Self {
1019        let time_vec = nanos
1020            .into_iter()
1021            .map(|nanos| {
1022                let nanos = nanos.into();
1023                NonMinI64::new(nanos)
1024                    .unwrap_or_else(|| {
1025                        re_log::error!(
1026                            illegal_value = nanos,
1027                            new_value = TimeInt::MIN.as_i64(),
1028                            "TimeColumn::new_timestamp_nanos_since_epoch() called with illegal value - clamped to minimum legal value"
1029                        );
1030                        NonMinI64::MIN
1031                    })
1032                    .get()
1033            })
1034            .collect_vec();
1035
1036        Self::new(
1037            None,
1038            Timeline::new(name, TimeType::TimestampNs),
1039            ArrowScalarBuffer::from(time_vec),
1040        )
1041    }
1042
1043    /// Creates a new [`TimeColumn`] of nanoseconds type.
1044    #[deprecated = "Use `TimeColumn::new_duration_nanos` or `new_timestamp_nanos_since_epoch` instead"]
1045    pub fn new_nanos(
1046        name: impl Into<re_log_types::TimelineName>,
1047        nanos: impl IntoIterator<Item = impl Into<i64>>,
1048    ) -> Self {
1049        Self::new_duration_nanos(name, nanos)
1050    }
1051
1052    /// Parse the given [`ArrowArray`] as a time column.
1053    ///
1054    /// Results in an error if the array is of the wrong datatype, or if it contains nulls.
1055    pub fn read_array(array: &dyn ArrowArray) -> Result<ArrowScalarBuffer<i64>, TimeColumnError> {
1056        if array.null_count() > 0 {
1057            Err(TimeColumnError::ContainsNulls)
1058        } else {
1059            Self::read_nullable_array(array).map(|(times, _nulls)| times)
1060        }
1061    }
1062
1063    /// Parse the given [`ArrowArray`] as a time column where null values are acceptable.
1064    ///
1065    /// Results in an error if the array is of the wrong datatype.
1066    pub fn read_nullable_array(
1067        array: &dyn ArrowArray,
1068    ) -> Result<(ArrowScalarBuffer<i64>, Option<ArrowNullBuffer>), TimeColumnError> {
1069        // Sequence timelines are i64, but time columns are nanoseconds (also as i64).
1070        if let Some(times) = array.downcast_array_ref::<arrow::array::Int64Array>() {
1071            Ok((times.values().clone(), times.nulls().cloned()))
1072        } else if let Some(times) =
1073            array.downcast_array_ref::<arrow::array::TimestampNanosecondArray>()
1074        {
1075            Ok((times.values().clone(), times.nulls().cloned()))
1076        } else if let Some(times) =
1077            array.downcast_array_ref::<arrow::array::Time64NanosecondArray>()
1078        {
1079            Ok((times.values().clone(), times.nulls().cloned()))
1080        } else if let Some(times) =
1081            array.downcast_array_ref::<arrow::array::DurationNanosecondArray>()
1082        {
1083            Ok((times.values().clone(), times.nulls().cloned()))
1084        } else {
1085            Err(TimeColumnError::UnsupportedDataType(
1086                array.data_type().clone(),
1087            ))
1088        }
1089    }
1090}
1091
1092// ---
1093
1094impl Chunk {
1095    #[inline]
1096    pub fn id(&self) -> ChunkId {
1097        self.id
1098    }
1099
1100    #[inline]
1101    pub fn entity_path(&self) -> &EntityPath {
1102        &self.entity_path
1103    }
1104
1105    /// How many columns in total? Includes control, time, and component columns.
1106    #[inline]
1107    pub fn num_columns(&self) -> usize {
1108        let Self {
1109            id: _,
1110            entity_path: _, // not an actual column
1111            heap_size_bytes: _,
1112            is_sorted: _,
1113            row_ids: _,
1114            timelines,
1115            components,
1116        } = self;
1117
1118        1 /* row_ids */ + timelines.len() + components.len()
1119    }
1120
1121    #[inline]
1122    pub fn num_controls(&self) -> usize {
1123        _ = self;
1124        1 /* row_ids */
1125    }
1126
1127    #[inline]
1128    pub fn num_timelines(&self) -> usize {
1129        self.timelines.len()
1130    }
1131
1132    #[inline]
1133    pub fn num_components(&self) -> usize {
1134        self.components.len()
1135    }
1136
1137    #[inline]
1138    pub fn num_rows(&self) -> usize {
1139        self.row_ids.len()
1140    }
1141
1142    #[inline]
1143    pub fn is_empty(&self) -> bool {
1144        self.num_rows() == 0
1145    }
1146
1147    #[inline]
1148    pub fn row_ids_array(&self) -> &FixedSizeBinaryArray {
1149        &self.row_ids
1150    }
1151
1152    #[inline]
1153    pub fn row_ids_slice(&self) -> &[RowId] {
1154        RowId::slice_from_arrow(&self.row_ids)
1155    }
1156
1157    /// All the [`RowId`] in this chunk.
1158    ///
1159    /// This could be in any order if this chunk is unsorted.
1160    #[inline]
1161    pub fn row_ids(&self) -> impl ExactSizeIterator<Item = RowId> + '_ {
1162        self.row_ids_slice().iter().copied()
1163    }
1164
1165    /// Returns an iterator over the [`RowId`]s of a [`Chunk`], for a given component.
1166    ///
1167    /// This is different than [`Self::row_ids`]: it will only yield `RowId`s for rows at which
1168    /// there is data for the specified `component_descriptor`.
1169    #[inline]
1170    pub fn component_row_ids(
1171        &self,
1172        component_descriptor: &ComponentDescriptor,
1173    ) -> impl Iterator<Item = RowId> + '_ + use<'_> {
1174        let Some(list_array) = self.components.get(component_descriptor) else {
1175            return Either::Left(std::iter::empty());
1176        };
1177
1178        let row_ids = self.row_ids();
1179
1180        if let Some(validity) = list_array.nulls() {
1181            Either::Right(Either::Left(
1182                row_ids
1183                    .enumerate()
1184                    .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
1185            ))
1186        } else {
1187            Either::Right(Either::Right(row_ids))
1188        }
1189    }
1190
1191    /// Returns the [`RowId`]-range covered by this [`Chunk`].
1192    ///
1193    /// `None` if the chunk `is_empty`.
1194    ///
1195    /// This is O(1) if the chunk is sorted, O(n) otherwise.
1196    #[inline]
1197    pub fn row_id_range(&self) -> Option<(RowId, RowId)> {
1198        if self.is_empty() {
1199            return None;
1200        }
1201
1202        let row_ids = self.row_ids_slice();
1203
1204        #[expect(clippy::unwrap_used)] // checked above
1205        Some(if self.is_sorted() {
1206            (
1207                row_ids.first().copied().unwrap(),
1208                row_ids.last().copied().unwrap(),
1209            )
1210        } else {
1211            (
1212                row_ids.iter().min().copied().unwrap(),
1213                row_ids.iter().max().copied().unwrap(),
1214            )
1215        })
1216    }
1217
1218    #[inline]
1219    pub fn is_static(&self) -> bool {
1220        self.timelines.is_empty()
1221    }
1222
1223    #[inline]
1224    pub fn timelines(&self) -> &IntMap<TimelineName, TimeColumn> {
1225        &self.timelines
1226    }
1227
1228    #[inline]
1229    pub fn component_descriptors(&self) -> impl Iterator<Item = ComponentDescriptor> + '_ {
1230        self.components.keys().cloned()
1231    }
1232
1233    #[inline]
1234    pub fn components(&self) -> &ChunkComponents {
1235        &self.components
1236    }
1237}
1238
1239impl std::fmt::Display for Chunk {
1240    #[inline]
1241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1242        let batch = self.to_record_batch().map_err(|err| {
1243            re_log::error_once!("couldn't display Chunk: {err}");
1244            std::fmt::Error
1245        })?;
1246        re_format_arrow::format_record_batch_with_width(&batch, f.width(), f.sign_minus()).fmt(f)
1247    }
1248}
1249
1250impl TimeColumn {
1251    #[inline]
1252    pub fn timeline(&self) -> &Timeline {
1253        &self.timeline
1254    }
1255
1256    #[inline]
1257    pub fn name(&self) -> &str {
1258        self.timeline.name()
1259    }
1260
1261    #[inline]
1262    pub fn time_range(&self) -> AbsoluteTimeRange {
1263        self.time_range
1264    }
1265
1266    #[inline]
1267    pub fn times_buffer(&self) -> &ArrowScalarBuffer<i64> {
1268        &self.times
1269    }
1270
1271    /// Returns an array with the appropriate datatype.
1272    #[inline]
1273    pub fn times_array(&self) -> ArrowArrayRef {
1274        self.timeline.typ().make_arrow_array(self.times.clone())
1275    }
1276
1277    /// All times in a time column are guaranteed not to have the value `i64::MIN`
1278    /// (which is reserved for static data).
1279    #[inline]
1280    pub fn times_raw(&self) -> &[i64] {
1281        self.times.as_ref()
1282    }
1283
1284    /// All times in a time column are guaranteed not to have the value `i64::MIN`
1285    /// (which is reserved for static data).
1286    #[inline]
1287    pub fn times_nonmin(&self) -> impl DoubleEndedIterator<Item = NonMinI64> + '_ {
1288        self.times_raw()
1289            .iter()
1290            .copied()
1291            .map(NonMinI64::saturating_from_i64)
1292    }
1293
1294    #[inline]
1295    pub fn times(&self) -> impl DoubleEndedIterator<Item = TimeInt> + '_ {
1296        self.times_raw().iter().copied().map(TimeInt::new_temporal)
1297    }
1298
1299    #[inline]
1300    pub fn num_rows(&self) -> usize {
1301        self.times.len()
1302    }
1303
1304    #[inline]
1305    pub fn is_empty(&self) -> bool {
1306        self.num_rows() == 0
1307    }
1308
1309    /// Computes the time range covered by each individual component column.
1310    ///
1311    /// This is different from the time range covered by the [`TimeColumn`] as a whole
1312    /// because component columns are potentially sparse.
1313    ///
1314    /// This is crucial for indexing and queries to work properly.
1315    //
1316    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
1317    pub fn time_range_per_component(
1318        &self,
1319        components: &ChunkComponents,
1320    ) -> IntMap<ComponentDescriptor, AbsoluteTimeRange> {
1321        let times = self.times_raw();
1322        components
1323            .iter()
1324            .filter_map(|(component_desc, list_array)| {
1325                if let Some(validity) = list_array.nulls() {
1326                    // Potentially sparse
1327
1328                    if validity.is_empty() {
1329                        return None;
1330                    }
1331
1332                    let is_dense = validity.null_count() == 0;
1333                    if is_dense {
1334                        return Some((component_desc.clone(), self.time_range));
1335                    }
1336
1337                    let mut time_min = TimeInt::MAX;
1338                    for (i, time) in times.iter().copied().enumerate() {
1339                        if validity.is_valid(i) {
1340                            time_min = TimeInt::new_temporal(time);
1341                            break;
1342                        }
1343                    }
1344
1345                    let mut time_max = TimeInt::MIN;
1346                    for (i, time) in times.iter().copied().enumerate().rev() {
1347                        if validity.is_valid(i) {
1348                            time_max = TimeInt::new_temporal(time);
1349                            break;
1350                        }
1351                    }
1352
1353                    Some((
1354                        component_desc.clone(),
1355                        AbsoluteTimeRange::new(time_min, time_max),
1356                    ))
1357                } else {
1358                    // Dense
1359
1360                    Some((component_desc.clone(), self.time_range))
1361                }
1362            })
1363            .collect()
1364    }
1365}
1366
1367impl re_byte_size::SizeBytes for Chunk {
1368    #[inline]
1369    fn heap_size_bytes(&self) -> u64 {
1370        let Self {
1371            id,
1372            entity_path,
1373            heap_size_bytes,
1374            is_sorted,
1375            row_ids,
1376            timelines,
1377            components,
1378        } = self;
1379
1380        let mut size_bytes = heap_size_bytes.load(Ordering::Relaxed);
1381
1382        if size_bytes == 0 {
1383            size_bytes = id.heap_size_bytes()
1384                + entity_path.heap_size_bytes()
1385                + is_sorted.heap_size_bytes()
1386                + row_ids.heap_size_bytes()
1387                + timelines.heap_size_bytes()
1388                + components.heap_size_bytes();
1389            heap_size_bytes.store(size_bytes, Ordering::Relaxed);
1390        }
1391
1392        size_bytes
1393    }
1394}
1395
1396impl re_byte_size::SizeBytes for TimeColumn {
1397    #[inline]
1398    fn heap_size_bytes(&self) -> u64 {
1399        let Self {
1400            timeline,
1401            times,
1402            is_sorted,
1403            time_range,
1404        } = self;
1405
1406        timeline.heap_size_bytes()
1407            + times.heap_size_bytes() // cheap
1408            + is_sorted.heap_size_bytes()
1409            + time_range.heap_size_bytes()
1410    }
1411}
1412
1413// --- Sanity checks ---
1414
1415impl Chunk {
1416    /// Returns an error if the Chunk's invariants are not upheld.
1417    ///
1418    /// Costly checks are only run in debug builds.
1419    #[track_caller]
1420    pub fn sanity_check(&self) -> ChunkResult<()> {
1421        re_tracing::profile_function!();
1422
1423        let Self {
1424            id: _,
1425            entity_path: _,
1426            heap_size_bytes,
1427            is_sorted,
1428            row_ids,
1429            timelines,
1430            components,
1431        } = self;
1432
1433        if cfg!(debug_assertions) {
1434            let measured = self.heap_size_bytes();
1435            let advertised = heap_size_bytes.load(Ordering::Relaxed);
1436            if advertised != measured {
1437                return Err(ChunkError::Malformed {
1438                    reason: format!(
1439                        "Chunk advertises a heap size of {} but we measure {} instead",
1440                        re_format::format_bytes(advertised as _),
1441                        re_format::format_bytes(measured as _),
1442                    ),
1443                });
1444            }
1445        }
1446
1447        // Row IDs
1448        {
1449            if *row_ids.data_type() != RowId::arrow_datatype() {
1450                return Err(ChunkError::Malformed {
1451                    reason: format!(
1452                        "RowId data has the wrong datatype: expected {:?} but got {:?} instead",
1453                        RowId::arrow_datatype(),
1454                        *row_ids.data_type(),
1455                    ),
1456                });
1457            }
1458
1459            #[expect(clippy::collapsible_if)] // readability
1460            if cfg!(debug_assertions) {
1461                if *is_sorted != self.is_sorted_uncached() {
1462                    return Err(ChunkError::Malformed {
1463                        reason: format!(
1464                            "Chunk is marked as {}sorted but isn't: {row_ids:?}",
1465                            if *is_sorted { "" } else { "un" },
1466                        ),
1467                    });
1468                }
1469            }
1470        }
1471
1472        // Timelines
1473        for (timeline_name, time_column) in timelines {
1474            if time_column.times.len() != row_ids.len() {
1475                return Err(ChunkError::Malformed {
1476                    reason: format!(
1477                        "All timelines in a chunk must have the same number of timestamps, matching the number of row IDs. \
1478                         Found {} row IDs but {} timestamps for timeline '{timeline_name}'",
1479                        row_ids.len(),
1480                        time_column.times.len(),
1481                    ),
1482                });
1483            }
1484
1485            time_column.sanity_check()?;
1486        }
1487
1488        // Components
1489
1490        for (component_desc, list_array) in components.iter() {
1491            if let Some(c) = component_desc.component_type {
1492                c.sanity_check();
1493            }
1494            // Ensure that each cell is a list (we don't support mono-components yet).
1495            if let arrow::datatypes::DataType::List(_field) = list_array.data_type() {
1496                // We don't check `field.is_nullable()` here because we support both.
1497                // TODO(#6819): Remove support for inner nullability.
1498            } else {
1499                return Err(ChunkError::Malformed {
1500                    reason: format!(
1501                        "The inner array in a chunked component batch must be a list, got {:?}",
1502                        list_array.data_type(),
1503                    ),
1504                });
1505            }
1506
1507            if list_array.len() != row_ids.len() {
1508                return Err(ChunkError::Malformed {
1509                    reason: format!(
1510                        "All component batches in a chunk must have the same number of rows, matching the number of row IDs. \
1511                             Found {} row IDs but {} rows for component batch {component_desc}",
1512                        row_ids.len(),
1513                        list_array.len(),
1514                    ),
1515                });
1516            }
1517
1518            let validity_is_empty = list_array
1519                .nulls()
1520                .is_some_and(|validity| validity.is_empty());
1521            if !self.is_empty() && validity_is_empty {
1522                return Err(ChunkError::Malformed {
1523                    reason: format!(
1524                        "All component batches in a chunk must contain at least one non-null entry.\
1525                             Found a completely empty column for {component_desc}",
1526                    ),
1527                });
1528            }
1529        }
1530
1531        Ok(())
1532    }
1533}
1534
1535impl TimeColumn {
1536    /// Returns an error if the Chunk's invariants are not upheld.
1537    ///
1538    /// Costly checks are only run in debug builds.
1539    #[track_caller]
1540    pub fn sanity_check(&self) -> ChunkResult<()> {
1541        let Self {
1542            timeline: _,
1543            times,
1544            is_sorted,
1545            time_range,
1546        } = self;
1547
1548        let times = times.as_ref();
1549
1550        if cfg!(debug_assertions)
1551            && *is_sorted != times.windows(2).all(|times| times[0] <= times[1])
1552        {
1553            return Err(ChunkError::Malformed {
1554                reason: format!(
1555                    "Time column is marked as {}sorted but isn't: {times:?}",
1556                    if *is_sorted { "" } else { "un" },
1557                ),
1558            });
1559        }
1560
1561        if cfg!(debug_assertions) {
1562            let is_tight_lower_bound = times.iter().any(|&time| time == time_range.min().as_i64());
1563            let is_tight_upper_bound = times.iter().any(|&time| time == time_range.max().as_i64());
1564            let is_tight_bound = is_tight_lower_bound && is_tight_upper_bound;
1565
1566            if !self.is_empty() && !is_tight_bound {
1567                return Err(ChunkError::Malformed {
1568                    reason: "Time column's cached time range isn't a tight bound.".to_owned(),
1569                });
1570            }
1571
1572            for &time in times {
1573                if time < time_range.min().as_i64() || time > time_range.max().as_i64() {
1574                    return Err(ChunkError::Malformed {
1575                        reason: format!(
1576                            "Time column's cached time range is wrong.\
1577                             Found a time value of {time} while its time range is {time_range:?}",
1578                        ),
1579                    });
1580                }
1581
1582                if time == TimeInt::STATIC.as_i64() {
1583                    return Err(ChunkError::Malformed {
1584                        reason: "A chunk's timeline should never contain a static time value."
1585                            .to_owned(),
1586                    });
1587                }
1588            }
1589        }
1590
1591        Ok(())
1592    }
1593}