re_chunk/
chunk.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3use ahash::HashMap;
4use anyhow::Context as _;
5use arrow::{
6    array::{
7        Array as ArrowArray, ArrayRef as ArrowArrayRef, FixedSizeBinaryArray,
8        ListArray as ArrowListArray,
9    },
10    buffer::{NullBuffer as ArrowNullBuffer, ScalarBuffer as ArrowScalarBuffer},
11};
12use itertools::{Either, Itertools as _, izip};
13use nohash_hasher::IntMap;
14
15use re_arrow_util::ArrowArrayDowncastRef as _;
16use re_byte_size::SizeBytes as _;
17use re_log_types::{
18    EntityPath, NonMinI64, ResolvedTimeRange, TimeInt, TimeType, Timeline, TimelineName,
19};
20use re_types_core::{
21    ArchetypeName, ComponentDescriptor, ComponentType, DeserializationError, Loggable as _,
22    SerializationError, SerializedComponentColumn,
23};
24
25use crate::{ChunkId, RowId};
26
27// ---
28
29/// Errors that can occur when creating/manipulating a [`Chunk`]s, directly or indirectly through
30/// the use of a [`crate::ChunkBatcher`].
31#[derive(thiserror::Error, Debug)]
32pub enum ChunkError {
33    #[error("Detected malformed Chunk: {reason}")]
34    Malformed { reason: String },
35
36    #[error("Arrow: {0}")]
37    Arrow(#[from] arrow::error::ArrowError),
38
39    #[error("{kind} index out of bounds: {index} (len={len})")]
40    IndexOutOfBounds {
41        kind: String,
42        len: usize,
43        index: usize,
44    },
45
46    #[error("Serialization: {0}")]
47    Serialization(#[from] SerializationError),
48
49    #[error("Deserialization: {0}")]
50    Deserialization(#[from] DeserializationError),
51
52    #[error(transparent)]
53    UnsupportedTimeType(#[from] re_sorbet::UnsupportedTimeType),
54
55    #[error(transparent)]
56    WrongDatatypeError(#[from] re_sorbet::WrongDatatypeError),
57
58    #[error(transparent)]
59    MismatchedChunkSchemaError(#[from] re_sorbet::MismatchedChunkSchemaError),
60
61    #[error(transparent)]
62    InvalidSorbetSchema(#[from] re_sorbet::SorbetError),
63}
64
65pub type ChunkResult<T> = Result<T, ChunkError>;
66
67// ---
68
69#[derive(Debug, Clone, Default, PartialEq)]
70pub struct ChunkComponents(pub IntMap<ComponentDescriptor, ArrowListArray>);
71
72impl ChunkComponents {
73    /// Returns all list arrays for the given component type.
74    ///
75    /// I.e semantically equivalent to `get("MyComponent:*.*")`
76    #[inline]
77    pub fn get_by_component_type(
78        &self,
79        component_type: ComponentType,
80    ) -> impl Iterator<Item = &ArrowListArray> {
81        self.0.iter().filter_map(move |(desc, array)| {
82            (desc.component_type == Some(component_type)).then_some(array)
83        })
84    }
85
86    /// Approximate equal, that ignores small numeric differences.
87    ///
88    /// Returns `Ok` if similar.
89    /// If there is a difference, a description of that difference is returned in `Err`.
90    /// We use [`anyhow`] to provide context.
91    ///
92    /// Useful for tests.
93    pub fn ensure_similar(left: &Self, right: &Self) -> anyhow::Result<()> {
94        anyhow::ensure!(left.len() == right.len());
95        for (descr, left_array) in left.iter() {
96            let Some(right_array) = right.get(descr) else {
97                anyhow::bail!("rhs is missing {descr:?}");
98            };
99            re_arrow_util::ensure_similar(&left_array.to_data(), &right_array.to_data())
100                .with_context(|| format!("Component {descr:?}"))?;
101        }
102        Ok(())
103    }
104
105    /// Whether any of the components in this chunk has the given name.
106    pub fn contains_component(&self, component_descr: &ComponentDescriptor) -> bool {
107        self.0.contains_key(component_descr)
108    }
109
110    /// Whether any of the components in this chunk is tagged with the given archetype name.
111    pub fn has_component_with_archetype(&self, archetype_name: ArchetypeName) -> bool {
112        self.0
113            .keys()
114            .any(|desc| desc.archetype == Some(archetype_name))
115    }
116}
117
118impl std::ops::Deref for ChunkComponents {
119    type Target = IntMap<ComponentDescriptor, ArrowListArray>;
120
121    #[inline]
122    fn deref(&self) -> &Self::Target {
123        &self.0
124    }
125}
126
127impl std::ops::DerefMut for ChunkComponents {
128    #[inline]
129    fn deref_mut(&mut self) -> &mut Self::Target {
130        &mut self.0
131    }
132}
133
134impl FromIterator<(ComponentDescriptor, ArrowListArray)> for ChunkComponents {
135    #[inline]
136    fn from_iter<T: IntoIterator<Item = (ComponentDescriptor, ArrowListArray)>>(iter: T) -> Self {
137        let mut this = Self::default();
138        {
139            for (component_desc, list_array) in iter {
140                this.insert(component_desc, list_array);
141            }
142        }
143        this
144    }
145}
146
147impl FromIterator<SerializedComponentColumn> for ChunkComponents {
148    #[inline]
149    fn from_iter<T: IntoIterator<Item = SerializedComponentColumn>>(iter: T) -> Self {
150        let mut this = Self::default();
151        {
152            for serialized in iter {
153                this.insert(serialized.descriptor, serialized.list_array);
154            }
155        }
156        this
157    }
158}
159
160/// Dense arrow-based storage of N rows of multi-component multi-temporal data for a specific entity.
161///
162/// This is our core datastructure for logging, storing, querying and transporting data around.
163///
164/// The chunk as a whole is always ascendingly sorted by [`RowId`] before it gets manipulated in any way.
165/// Its time columns might or might not be ascendingly sorted, depending on how the data was logged.
166///
167/// This is the in-memory representation of a chunk, optimized for efficient manipulation of the
168/// data within. For transport, see [`re_sorbet::ChunkBatch`] instead.
169#[derive(Debug)]
170pub struct Chunk {
171    pub(crate) id: ChunkId,
172
173    pub(crate) entity_path: EntityPath,
174
175    /// The heap size of this chunk in bytes.
176    ///
177    /// Must be cached as it is very costly to compute, and needs to be computed repeatedly on the
178    /// hot path (e.g. during garbage collection).
179    pub(crate) heap_size_bytes: AtomicU64,
180
181    /// Is the chunk as a whole sorted by [`RowId`]?
182    pub(crate) is_sorted: bool,
183
184    /// The respective [`RowId`]s for each row of data.
185    pub(crate) row_ids: FixedSizeBinaryArray,
186
187    /// The time columns.
188    ///
189    /// Each column must be the same length as `row_ids`.
190    ///
191    /// Empty if this is a static chunk.
192    pub(crate) timelines: IntMap<TimelineName, TimeColumn>,
193
194    /// A sparse `ListArray` for each component.
195    ///
196    /// Each `ListArray` must be the same length as `row_ids`.
197    ///
198    /// Sparse so that we can e.g. log a `Position` at one timestamp but not a `Color`.
199    pub(crate) components: ChunkComponents,
200}
201
202impl PartialEq for Chunk {
203    #[inline]
204    fn eq(&self, other: &Self) -> bool {
205        let Self {
206            id,
207            entity_path,
208            heap_size_bytes: _,
209            is_sorted,
210            row_ids,
211            timelines,
212            components,
213        } = self;
214
215        *id == other.id
216            && *entity_path == other.entity_path
217            && *is_sorted == other.is_sorted
218            && *row_ids == other.row_ids
219            && *timelines == other.timelines
220            && *components == other.components
221    }
222}
223
224impl Chunk {
225    /// Returns a version of us with a new [`ChunkId`].
226    ///
227    /// Reminder:
228    /// * The returned [`Chunk`] will re-use the exact same [`RowId`]s as `self`.
229    /// * Duplicated [`RowId`]s in the `ChunkStore` is undefined behavior.
230    #[must_use]
231    #[inline]
232    pub fn with_id(mut self, id: ChunkId) -> Self {
233        self.id = id;
234        self
235    }
236
237    /// Returns `Ok` if two [`Chunk`]s are _similar_, although not byte-for-byte equal.
238    ///
239    /// In particular, this ignores chunks and row IDs, as well as `log_time` timestamps.
240    /// It also forgives small numeric inaccuracies in floating point buffers.
241    ///
242    /// If there is a difference, a description of that difference is returned in `Err`.
243    /// We use [`anyhow`] to provide context.
244    ///
245    /// Useful for tests.
246    pub fn ensure_similar(lhs: &Self, rhs: &Self) -> anyhow::Result<()> {
247        let Self {
248            id: _,
249            entity_path,
250            heap_size_bytes: _,
251            is_sorted: _,
252            row_ids: _,
253            timelines,
254            components,
255        } = lhs;
256
257        anyhow::ensure!(*entity_path == rhs.entity_path);
258
259        anyhow::ensure!(timelines.keys().collect_vec() == rhs.timelines.keys().collect_vec());
260
261        for (timeline, left_time_col) in timelines {
262            let right_time_col = rhs
263                .timelines
264                .get(timeline)
265                .ok_or_else(|| anyhow::format_err!("right is missing timeline {timeline:?}"))?;
266            if timeline == &TimelineName::log_time() {
267                continue; // We expect this to differ
268            }
269            if timeline == "sim_time" {
270                continue; // Small numeric differences
271            }
272            anyhow::ensure!(
273                left_time_col == right_time_col,
274                "Timeline differs: {timeline:?}"
275            );
276        }
277
278        // Handle edge case: recording time on partition properties should ignore start time.
279        if entity_path == &EntityPath::properties() {
280            // We're going to filter out some components on both lhs and rhs.
281            // Therefore, it's important that we first check that the number of components is the same.
282            anyhow::ensure!(components.len() == rhs.components.len());
283
284            // Copied from `rerun.archetypes.RecordingInfo`.
285            let recording_time_descriptor = ComponentDescriptor {
286                archetype: Some("rerun.archetypes.RecordingInfo".into()),
287                component: "RecordingInfo:start_time".into(),
288                component_type: Some("rerun.components.Timestamp".into()),
289            };
290
291            // Filter out the recording time component from both lhs and rhs.
292            let lhs_components = components
293                .iter()
294                .filter(|&(desc, _list_array)| (desc != &recording_time_descriptor))
295                .map(|(desc, list_array)| (desc.clone(), list_array.clone()))
296                .collect::<IntMap<_, _>>();
297            let rhs_components = rhs
298                .components
299                .iter()
300                .filter(|&(desc, _list_array)| (desc != &recording_time_descriptor))
301                .map(|(desc, list_array)| (desc.clone(), list_array.clone()))
302                .collect::<IntMap<_, _>>();
303
304            anyhow::ensure!(lhs_components == rhs_components);
305            Ok(())
306        } else {
307            ChunkComponents::ensure_similar(components, &rhs.components)
308        }
309    }
310
311    // Only used for tests atm
312    pub fn are_equal(&self, other: &Self) -> bool {
313        let Self {
314            id,
315            entity_path,
316            heap_size_bytes: _,
317            is_sorted,
318            row_ids,
319            timelines,
320            components,
321        } = self;
322
323        let my_components: IntMap<_, _> = components
324            .iter()
325            .map(|(descr, list_array)| (descr.clone(), list_array))
326            .collect();
327
328        let other_components: IntMap<_, _> = other
329            .components
330            .iter()
331            .map(|(descr, list_array)| (descr.clone(), list_array))
332            .collect();
333
334        *id == other.id
335            && *entity_path == other.entity_path
336            && *is_sorted == other.is_sorted
337            && row_ids == &other.row_ids
338            && *timelines == other.timelines
339            && my_components == other_components
340    }
341}
342
343impl Clone for Chunk {
344    #[inline]
345    fn clone(&self) -> Self {
346        Self {
347            id: self.id,
348            entity_path: self.entity_path.clone(),
349            heap_size_bytes: AtomicU64::new(self.heap_size_bytes.load(Ordering::Relaxed)),
350            is_sorted: self.is_sorted,
351            row_ids: self.row_ids.clone(),
352            timelines: self.timelines.clone(),
353            components: self.components.clone(),
354        }
355    }
356}
357
358impl Chunk {
359    /// Clones the chunk and assign new IDs to the resulting chunk and its rows.
360    ///
361    /// `first_row_id` will become the [`RowId`] of the first row in the duplicated chunk.
362    /// Each row after that will be monotonically increasing.
363    #[inline]
364    pub fn clone_as(&self, id: ChunkId, first_row_id: RowId) -> Self {
365        let row_ids = std::iter::from_fn({
366            let mut row_id = first_row_id;
367            move || {
368                let yielded = row_id;
369                row_id = row_id.next();
370                Some(yielded)
371            }
372        })
373        .take(self.row_ids.len())
374        .collect_vec();
375
376        Self {
377            id,
378            row_ids: RowId::arrow_from_slice(&row_ids),
379            ..self.clone()
380        }
381    }
382
383    /// Clones the chunk into a new chunk without any time data.
384    #[inline]
385    pub fn into_static(mut self) -> Self {
386        self.timelines.clear();
387        self
388    }
389
390    /// Clones the chunk into a new chunk where all [`RowId`]s are [`RowId::ZERO`].
391    pub fn zeroed(self) -> Self {
392        let row_ids = std::iter::repeat(RowId::ZERO)
393            .take(self.row_ids.len())
394            .collect_vec();
395
396        let row_ids = RowId::arrow_from_slice(&row_ids);
397
398        Self { row_ids, ..self }
399    }
400
401    /// Computes the time range covered by each individual component column on each timeline.
402    ///
403    /// This is different from the time range covered by the [`Chunk`] as a whole because component
404    /// columns are potentially sparse.
405    ///
406    /// This is crucial for indexing and queries to work properly.
407    //
408    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
409    #[inline]
410    pub fn time_range_per_component(
411        &self,
412    ) -> IntMap<TimelineName, IntMap<ComponentDescriptor, ResolvedTimeRange>> {
413        re_tracing::profile_function!();
414
415        self.timelines
416            .iter()
417            .map(|(timeline_name, time_column)| {
418                (
419                    *timeline_name,
420                    time_column.time_range_per_component(&self.components),
421                )
422            })
423            .collect()
424    }
425
426    /// The cumulative number of events in this chunk.
427    ///
428    /// I.e. how many _component batches_ ("cells") were logged in total?
429    //
430    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
431    #[inline]
432    pub fn num_events_cumulative(&self) -> u64 {
433        // Reminder: component columns are sparse, we must take a look at the validity bitmaps.
434        self.components
435            .values()
436            .map(|list_array| {
437                list_array.nulls().map_or_else(
438                    || list_array.len() as u64,
439                    |validity| validity.len() as u64 - validity.null_count() as u64,
440                )
441            })
442            .sum()
443    }
444
445    /// The cumulative number of events in this chunk for each _unique_ timestamp.
446    ///
447    /// I.e. how many _component batches_ ("cells") were logged in total at each timestamp?
448    ///
449    /// Keep in mind that a timestamp can appear multiple times in a [`Chunk`].
450    /// This method will do a sum accumulation to account for these cases (i.e. every timestamp in
451    /// the returned vector is guaranteed to be unique).
452    pub fn num_events_cumulative_per_unique_time(
453        &self,
454        timeline: &TimelineName,
455    ) -> Vec<(TimeInt, u64)> {
456        re_tracing::profile_function!();
457
458        if self.is_static() {
459            return vec![(TimeInt::STATIC, self.num_events_cumulative())];
460        }
461
462        let Some(time_column) = self.timelines().get(timeline) else {
463            return Vec::new();
464        };
465
466        let time_range = time_column.time_range();
467        if time_range.min() == time_range.max() {
468            return vec![(time_range.min(), self.num_events_cumulative())];
469        }
470
471        let counts = if time_column.is_sorted() {
472            self.num_events_cumulative_per_unique_time_sorted(time_column)
473        } else {
474            self.num_events_cumulative_per_unique_time_unsorted(time_column)
475        };
476
477        debug_assert!(
478            counts
479                .iter()
480                .tuple_windows::<(_, _)>()
481                .all(|((time1, _), (time2, _))| time1 < time2)
482        );
483
484        counts
485    }
486
487    fn num_events_cumulative_per_unique_time_sorted(
488        &self,
489        time_column: &TimeColumn,
490    ) -> Vec<(TimeInt, u64)> {
491        re_tracing::profile_function!();
492
493        debug_assert!(time_column.is_sorted());
494
495        // NOTE: This is used on some very hot paths (time panel rendering).
496        // Performance trumps readability. Optimized empirically.
497
498        // Raw, potentially duplicated counts (because timestamps aren't necessarily unique).
499        let mut counts_raw = vec![0u64; self.num_rows()];
500        {
501            self.components.values().for_each(|list_array| {
502                if let Some(validity) = list_array.nulls() {
503                    validity
504                        .iter()
505                        .enumerate()
506                        .for_each(|(i, is_valid)| counts_raw[i] += is_valid as u64);
507                } else {
508                    counts_raw.iter_mut().for_each(|count| *count += 1);
509                }
510            });
511        }
512
513        let mut counts = Vec::with_capacity(counts_raw.len());
514
515        let Some(mut cur_time) = time_column.times().next() else {
516            return Vec::new();
517        };
518        let mut cur_count = 0;
519        izip!(time_column.times(), counts_raw).for_each(|(time, count)| {
520            if time == cur_time {
521                cur_count += count;
522            } else {
523                counts.push((cur_time, cur_count));
524                cur_count = count;
525                cur_time = time;
526            }
527        });
528
529        if counts.last().map(|(time, _)| *time) != Some(cur_time) {
530            counts.push((cur_time, cur_count));
531        }
532
533        counts
534    }
535
536    fn num_events_cumulative_per_unique_time_unsorted(
537        &self,
538        time_column: &TimeColumn,
539    ) -> Vec<(TimeInt, u64)> {
540        re_tracing::profile_function!();
541
542        debug_assert!(!time_column.is_sorted());
543
544        // NOTE: This is used on some very hot paths (time panel rendering).
545
546        let result_unordered =
547            self.components
548                .values()
549                .fold(HashMap::default(), |acc, list_array| {
550                    if let Some(validity) = list_array.nulls() {
551                        time_column.times().zip(validity.iter()).fold(
552                            acc,
553                            |mut acc, (time, is_valid)| {
554                                *acc.entry(time).or_default() += is_valid as u64;
555                                acc
556                            },
557                        )
558                    } else {
559                        time_column.times().fold(acc, |mut acc, time| {
560                            *acc.entry(time).or_default() += 1;
561                            acc
562                        })
563                    }
564                });
565
566        let mut result = result_unordered.into_iter().collect_vec();
567        result.sort_by_key(|val| val.0);
568        result
569    }
570
571    /// The number of events in this chunk for the specified component.
572    ///
573    /// I.e. how many _component batches_ ("cells") were logged in total for this component?
574    //
575    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
576    #[inline]
577    pub fn num_events_for_component(
578        &self,
579        component_descriptor: &ComponentDescriptor,
580    ) -> Option<u64> {
581        // Reminder: component columns are sparse, we must check validity bitmap.
582        self.components.get(component_descriptor).map(|list_array| {
583            list_array.nulls().map_or_else(
584                || list_array.len() as u64,
585                |validity| validity.len() as u64 - validity.null_count() as u64,
586            )
587        })
588    }
589
590    /// Computes the `RowId` range covered by each individual component column on each timeline.
591    ///
592    /// This is different from the `RowId` range covered by the [`Chunk`] as a whole because component
593    /// columns are potentially sparse.
594    ///
595    /// This is crucial for indexing and queries to work properly.
596    //
597    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
598    pub fn row_id_range_per_component(&self) -> IntMap<ComponentDescriptor, (RowId, RowId)> {
599        re_tracing::profile_function!();
600
601        let row_ids = self.row_ids().collect_vec();
602
603        if self.is_sorted() {
604            self.components
605                .iter()
606                .filter_map(|(component_desc, list_array)| {
607                    let mut row_id_min = None;
608                    let mut row_id_max = None;
609
610                    for (i, &row_id) in row_ids.iter().enumerate() {
611                        if list_array.is_valid(i) {
612                            row_id_min = Some(row_id);
613                        }
614                    }
615                    for (i, &row_id) in row_ids.iter().enumerate().rev() {
616                        if list_array.is_valid(i) {
617                            row_id_max = Some(row_id);
618                        }
619                    }
620
621                    Some((component_desc.clone(), (row_id_min?, row_id_max?)))
622                })
623                .collect()
624        } else {
625            self.components
626                .iter()
627                .filter_map(|(component_desc, list_array)| {
628                    let mut row_id_min = Some(RowId::MAX);
629                    let mut row_id_max = Some(RowId::ZERO);
630
631                    for (i, &row_id) in row_ids.iter().enumerate() {
632                        if list_array.is_valid(i) && Some(row_id) > row_id_min {
633                            row_id_min = Some(row_id);
634                        }
635                    }
636                    for (i, &row_id) in row_ids.iter().enumerate().rev() {
637                        if list_array.is_valid(i) && Some(row_id) < row_id_max {
638                            row_id_max = Some(row_id);
639                        }
640                    }
641
642                    Some((component_desc.clone(), (row_id_min?, row_id_max?)))
643                })
644                .collect()
645        }
646    }
647}
648
649// ---
650
651#[derive(Debug, Clone, PartialEq, Eq)]
652pub struct TimeColumn {
653    pub(crate) timeline: Timeline,
654
655    /// Every single timestamp for this timeline.
656    ///
657    /// * This might or might not be sorted, depending on how the data was logged.
658    /// * This is guaranteed to always be dense, because chunks are split anytime a timeline is
659    ///   added or removed.
660    /// * This cannot ever contain `TimeInt::STATIC`, since static data doesn't even have timelines.
661    ///
662    /// When this buffer is converted to an arrow array, it's datatype will depend
663    /// on the timeline type, so it will either become a
664    /// [`arrow::array::Int64Array`] or a [`arrow::array::TimestampNanosecondArray`].
665    pub(crate) times: ArrowScalarBuffer<i64>,
666
667    /// Is [`Self::times`] sorted?
668    ///
669    /// This is completely independent of [`Chunk::is_sorted`]: a timeline doesn't necessarily
670    /// follow the global [`RowId`]-based order, although it does in most cases (happy path).
671    pub(crate) is_sorted: bool,
672
673    /// The time range covered by [`Self::times`].
674    ///
675    /// Not necessarily contiguous! Just the min and max value found in [`Self::times`].
676    pub(crate) time_range: ResolvedTimeRange,
677}
678
679/// Errors when deserializing/parsing/reading a column of time data.
680#[derive(Debug, thiserror::Error)]
681pub enum TimeColumnError {
682    #[error("Time columns had nulls, but should be dense")]
683    ContainsNulls,
684
685    #[error("Unsupported data type : {0}")]
686    UnsupportedDataType(arrow::datatypes::DataType),
687}
688
689impl Chunk {
690    /// Creates a new [`Chunk`].
691    ///
692    /// This will fail if the passed in data is malformed in any way -- see [`Self::sanity_check`]
693    /// for details.
694    ///
695    /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`.
696    /// When left unspecified (`None`), it will be computed in O(n) time.
697    ///
698    /// For a row-oriented constructor, see [`Self::builder`].
699    pub fn new(
700        id: ChunkId,
701        entity_path: EntityPath,
702        is_sorted: Option<bool>,
703        row_ids: FixedSizeBinaryArray,
704        timelines: IntMap<TimelineName, TimeColumn>,
705        components: ChunkComponents,
706    ) -> ChunkResult<Self> {
707        let mut chunk = Self {
708            id,
709            entity_path,
710            heap_size_bytes: AtomicU64::new(0),
711            is_sorted: false,
712            row_ids,
713            timelines,
714            components,
715        };
716
717        chunk.is_sorted = is_sorted.unwrap_or_else(|| chunk.is_sorted_uncached());
718
719        chunk.sanity_check()?;
720
721        Ok(chunk)
722    }
723
724    /// Creates a new [`Chunk`].
725    ///
726    /// This will fail if the passed in data is malformed in any way -- see [`Self::sanity_check`]
727    /// for details.
728    ///
729    /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`.
730    /// When left unspecified (`None`), it will be computed in O(n) time.
731    ///
732    /// For a row-oriented constructor, see [`Self::builder`].
733    pub fn from_native_row_ids(
734        id: ChunkId,
735        entity_path: EntityPath,
736        is_sorted: Option<bool>,
737        row_ids: &[RowId],
738        timelines: IntMap<TimelineName, TimeColumn>,
739        components: ChunkComponents,
740    ) -> ChunkResult<Self> {
741        re_tracing::profile_function!();
742        let row_ids = RowId::arrow_from_slice(row_ids);
743        Self::new(id, entity_path, is_sorted, row_ids, timelines, components)
744    }
745
746    /// Creates a new [`Chunk`].
747    ///
748    /// This will fail if the passed in data is malformed in any way -- see [`Self::sanity_check`]
749    /// for details.
750    ///
751    /// The data is assumed to be sorted in `RowId`-order. Sequential `RowId`s will be generated for each
752    /// row in the chunk.
753    pub fn from_auto_row_ids(
754        id: ChunkId,
755        entity_path: EntityPath,
756        timelines: IntMap<TimelineName, TimeColumn>,
757        components: ChunkComponents,
758    ) -> ChunkResult<Self> {
759        let count = components
760            .iter()
761            .next()
762            .map_or(0, |(_, list_array)| list_array.len());
763
764        let row_ids = std::iter::from_fn({
765            let tuid: re_tuid::Tuid = *id;
766            let mut row_id = RowId::from_tuid(tuid.next());
767            move || {
768                let yielded = row_id;
769                row_id = row_id.next();
770                Some(yielded)
771            }
772        })
773        .take(count)
774        .collect_vec();
775
776        Self::from_native_row_ids(id, entity_path, Some(true), &row_ids, timelines, components)
777    }
778
779    /// Simple helper for [`Self::new`] for static data.
780    ///
781    /// For a row-oriented constructor, see [`Self::builder`].
782    #[inline]
783    pub fn new_static(
784        id: ChunkId,
785        entity_path: EntityPath,
786        is_sorted: Option<bool>,
787        row_ids: FixedSizeBinaryArray,
788        components: ChunkComponents,
789    ) -> ChunkResult<Self> {
790        Self::new(
791            id,
792            entity_path,
793            is_sorted,
794            row_ids,
795            Default::default(),
796            components,
797        )
798    }
799
800    #[inline]
801    pub fn empty(id: ChunkId, entity_path: EntityPath) -> Self {
802        Self {
803            id,
804            entity_path,
805            heap_size_bytes: Default::default(),
806            is_sorted: true,
807            row_ids: RowId::arrow_from_slice(&[]),
808            timelines: Default::default(),
809            components: Default::default(),
810        }
811    }
812
813    /// Unconditionally inserts an [`ArrowListArray`] as a component column.
814    ///
815    /// Removes and replaces the column if it already exists.
816    ///
817    /// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`].
818    #[inline]
819    pub fn add_component(
820        &mut self,
821        component_desc: ComponentDescriptor,
822        list_array: ArrowListArray,
823    ) -> ChunkResult<()> {
824        self.components.insert(component_desc, list_array);
825        self.sanity_check()
826    }
827
828    /// Unconditionally inserts a [`TimeColumn`].
829    ///
830    /// Removes and replaces the column if it already exists.
831    ///
832    /// This will fail if the end result is malformed in any way -- see [`Self::sanity_check`].
833    #[inline]
834    pub fn add_timeline(&mut self, chunk_timeline: TimeColumn) -> ChunkResult<()> {
835        self.timelines
836            .insert(*chunk_timeline.timeline.name(), chunk_timeline);
837        self.sanity_check()
838    }
839}
840
841impl TimeColumn {
842    /// Creates a new [`TimeColumn`].
843    ///
844    /// Iff you know for sure whether the data is already appropriately sorted or not, specify `is_sorted`.
845    /// When left unspecified (`None`), it will be computed in O(n) time.
846    ///
847    /// For a row-oriented constructor, see [`Self::builder`].
848    pub fn new(is_sorted: Option<bool>, timeline: Timeline, times: ArrowScalarBuffer<i64>) -> Self {
849        re_tracing::profile_function_if!(1000 < times.len(), format!("{} times", times.len()));
850
851        let time_slice = times.as_ref();
852
853        let is_sorted =
854            is_sorted.unwrap_or_else(|| time_slice.windows(2).all(|times| times[0] <= times[1]));
855
856        let time_range = if is_sorted {
857            // NOTE: The 'or' in 'map_or' is never hit, but better safe than sorry.
858            let min_time = time_slice
859                .first()
860                .copied()
861                .map_or(TimeInt::MIN, TimeInt::new_temporal);
862            let max_time = time_slice
863                .last()
864                .copied()
865                .map_or(TimeInt::MAX, TimeInt::new_temporal);
866            ResolvedTimeRange::new(min_time, max_time)
867        } else {
868            // NOTE: Do the iteration multiple times in a cache-friendly way rather than the opposite.
869            // NOTE: The 'or' in 'unwrap_or' is never hit, but better safe than sorry.
870            let min_time = time_slice
871                .iter()
872                .min()
873                .copied()
874                .map_or(TimeInt::MIN, TimeInt::new_temporal);
875            let max_time = time_slice
876                .iter()
877                .max()
878                .copied()
879                .map_or(TimeInt::MAX, TimeInt::new_temporal);
880            ResolvedTimeRange::new(min_time, max_time)
881        };
882
883        Self {
884            timeline,
885            times,
886            is_sorted,
887            time_range,
888        }
889    }
890
891    /// Creates a new [`TimeColumn`] of sequence type.
892    pub fn new_sequence(
893        name: impl Into<re_log_types::TimelineName>,
894        times: impl IntoIterator<Item = impl Into<i64>>,
895    ) -> Self {
896        let time_vec: Vec<_> = times.into_iter().map(|t| {
897            let t = t.into();
898            TimeInt::try_from(t)
899                .unwrap_or_else(|_| {
900                    re_log::error!(
901                illegal_value = t,
902                new_value = TimeInt::MIN.as_i64(),
903                "TimeColumn::new_sequence() called with illegal value - clamped to minimum legal value"
904            );
905                    TimeInt::MIN
906                })
907                .as_i64()
908        }).collect();
909
910        Self::new(
911            None,
912            Timeline::new_sequence(name.into()),
913            ArrowScalarBuffer::from(time_vec),
914        )
915    }
916
917    /// Creates a new [`TimeColumn`] of duration type, in seconds.
918    pub fn new_duration_secs(
919        name: impl Into<re_log_types::TimelineName>,
920        seconds: impl IntoIterator<Item = impl Into<f64>>,
921    ) -> Self {
922        let time_vec = seconds.into_iter().map(|seconds| {
923            let seconds = seconds.into();
924            let nanos = (1e9 * seconds).round();
925            let clamped = NonMinI64::saturating_from_i64(nanos as i64);
926            if clamped.get() as f64 != nanos {
927                re_log::warn!(
928                    illegal_value = nanos,
929                    new_value = clamped.get(),
930                    "TimeColumn::new_duration_secs() called with out-of-range value. Clamped to valid range."
931                );
932            }
933            clamped.get()
934        }).collect_vec();
935
936        Self::new(
937            None,
938            Timeline::new(name, TimeType::DurationNs),
939            ArrowScalarBuffer::from(time_vec),
940        )
941    }
942
943    /// Creates a new [`TimeColumn`] of duration type, in seconds.
944    pub fn new_timestamp_secs_since_epoch(
945        name: impl Into<re_log_types::TimelineName>,
946        seconds: impl IntoIterator<Item = impl Into<f64>>,
947    ) -> Self {
948        let time_vec = seconds.into_iter().map(|seconds| {
949            let seconds = seconds.into();
950            let nanos = (1e9 * seconds).round();
951            let clamped = NonMinI64::saturating_from_i64(nanos as i64);
952            if clamped.get() as f64 != nanos {
953                re_log::warn!(
954                    illegal_value = nanos,
955                    new_value = clamped.get(),
956                    "TimeColumn::new_timestamp_secs_since_epoch() called with out-of-range value. Clamped to valid range."
957                );
958            }
959            clamped.get()
960        }).collect_vec();
961
962        Self::new(
963            None,
964            Timeline::new(name, TimeType::TimestampNs),
965            ArrowScalarBuffer::from(time_vec),
966        )
967    }
968
969    /// Creates a new [`TimeColumn`] of duration type, in seconds.
970    #[deprecated = "Use `TimeColumn::new_duration_secs` or `new_timestamp_secs_since_epoch` instead"]
971    pub fn new_seconds(
972        name: impl Into<re_log_types::TimelineName>,
973        seconds: impl IntoIterator<Item = impl Into<f64>>,
974    ) -> Self {
975        Self::new_duration_secs(name, seconds)
976    }
977
978    /// Creates a new [`TimeColumn`] measuring duration in nanoseconds.
979    pub fn new_duration_nanos(
980        name: impl Into<re_log_types::TimelineName>,
981        nanos: impl IntoIterator<Item = impl Into<i64>>,
982    ) -> Self {
983        let time_vec = nanos
984            .into_iter()
985            .map(|nanos| {
986                let nanos = nanos.into();
987                NonMinI64::new(nanos)
988                    .unwrap_or_else(|| {
989                        re_log::error!(
990                            illegal_value = nanos,
991                            new_value = TimeInt::MIN.as_i64(),
992                            "TimeColumn::new_duration_nanos() called with illegal value - clamped to minimum legal value"
993                        );
994                        NonMinI64::MIN
995                    })
996                    .get()
997            })
998            .collect_vec();
999
1000        Self::new(
1001            None,
1002            Timeline::new(name, TimeType::DurationNs),
1003            ArrowScalarBuffer::from(time_vec),
1004        )
1005    }
1006
1007    /// Creates a new [`TimeColumn`] of timestamps, as nanoseconds since unix epoch.
1008    pub fn new_timestamp_nanos_since_epoch(
1009        name: impl Into<re_log_types::TimelineName>,
1010        nanos: impl IntoIterator<Item = impl Into<i64>>,
1011    ) -> Self {
1012        let time_vec = nanos
1013            .into_iter()
1014            .map(|nanos| {
1015                let nanos = nanos.into();
1016                NonMinI64::new(nanos)
1017                    .unwrap_or_else(|| {
1018                        re_log::error!(
1019                            illegal_value = nanos,
1020                            new_value = TimeInt::MIN.as_i64(),
1021                            "TimeColumn::new_timestamp_nanos_since_epoch() called with illegal value - clamped to minimum legal value"
1022                        );
1023                        NonMinI64::MIN
1024                    })
1025                    .get()
1026            })
1027            .collect_vec();
1028
1029        Self::new(
1030            None,
1031            Timeline::new(name, TimeType::TimestampNs),
1032            ArrowScalarBuffer::from(time_vec),
1033        )
1034    }
1035
1036    /// Creates a new [`TimeColumn`] of nanoseconds type.
1037    #[deprecated = "Use `TimeColumn::new_duration_nanos` or `new_timestamp_nanos_since_epoch` instead"]
1038    pub fn new_nanos(
1039        name: impl Into<re_log_types::TimelineName>,
1040        nanos: impl IntoIterator<Item = impl Into<i64>>,
1041    ) -> Self {
1042        Self::new_duration_nanos(name, nanos)
1043    }
1044
1045    /// Parse the given [`ArrowArray`] as a time column.
1046    ///
1047    /// Results in an error if the array is of the wrong datatype, or if it contains nulls.
1048    pub fn read_array(array: &dyn ArrowArray) -> Result<ArrowScalarBuffer<i64>, TimeColumnError> {
1049        #![allow(clippy::manual_map)]
1050
1051        if array.null_count() > 0 {
1052            Err(TimeColumnError::ContainsNulls)
1053        } else {
1054            Self::read_nullable_array(array).map(|(times, _nulls)| times)
1055        }
1056    }
1057
1058    /// Parse the given [`ArrowArray`] as a time column where null values are acceptable.
1059    ///
1060    /// Results in an error if the array is of the wrong datatype.
1061    pub fn read_nullable_array(
1062        array: &dyn ArrowArray,
1063    ) -> Result<(ArrowScalarBuffer<i64>, Option<ArrowNullBuffer>), TimeColumnError> {
1064        #![allow(clippy::manual_map)]
1065
1066        // Sequence timelines are i64, but time columns are nanoseconds (also as i64).
1067        if let Some(times) = array.downcast_array_ref::<arrow::array::Int64Array>() {
1068            Ok((times.values().clone(), times.nulls().cloned()))
1069        } else if let Some(times) =
1070            array.downcast_array_ref::<arrow::array::TimestampNanosecondArray>()
1071        {
1072            Ok((times.values().clone(), times.nulls().cloned()))
1073        } else if let Some(times) =
1074            array.downcast_array_ref::<arrow::array::Time64NanosecondArray>()
1075        {
1076            Ok((times.values().clone(), times.nulls().cloned()))
1077        } else if let Some(times) =
1078            array.downcast_array_ref::<arrow::array::DurationNanosecondArray>()
1079        {
1080            Ok((times.values().clone(), times.nulls().cloned()))
1081        } else {
1082            Err(TimeColumnError::UnsupportedDataType(
1083                array.data_type().clone(),
1084            ))
1085        }
1086    }
1087}
1088
1089// ---
1090
1091impl Chunk {
1092    #[inline]
1093    pub fn id(&self) -> ChunkId {
1094        self.id
1095    }
1096
1097    #[inline]
1098    pub fn entity_path(&self) -> &EntityPath {
1099        &self.entity_path
1100    }
1101
1102    /// How many columns in total? Includes control, time, and component columns.
1103    #[inline]
1104    pub fn num_columns(&self) -> usize {
1105        let Self {
1106            id: _,
1107            entity_path: _, // not an actual column
1108            heap_size_bytes: _,
1109            is_sorted: _,
1110            row_ids: _,
1111            timelines,
1112            components,
1113        } = self;
1114
1115        1 /* row_ids */ + timelines.len() + components.len()
1116    }
1117
1118    #[inline]
1119    pub fn num_controls(&self) -> usize {
1120        _ = self;
1121        1 /* row_ids */
1122    }
1123
1124    #[inline]
1125    pub fn num_timelines(&self) -> usize {
1126        self.timelines.len()
1127    }
1128
1129    #[inline]
1130    pub fn num_components(&self) -> usize {
1131        self.components.len()
1132    }
1133
1134    #[inline]
1135    pub fn num_rows(&self) -> usize {
1136        self.row_ids.len()
1137    }
1138
1139    #[inline]
1140    pub fn is_empty(&self) -> bool {
1141        self.num_rows() == 0
1142    }
1143
1144    #[inline]
1145    pub fn row_ids_array(&self) -> &FixedSizeBinaryArray {
1146        &self.row_ids
1147    }
1148
1149    #[inline]
1150    pub fn row_ids_slice(&self) -> &[RowId] {
1151        RowId::slice_from_arrow(&self.row_ids)
1152    }
1153
1154    /// All the [`RowId`] in this chunk.
1155    ///
1156    /// This could be in any order if this chunk is unsorted.
1157    #[inline]
1158    pub fn row_ids(&self) -> impl ExactSizeIterator<Item = RowId> + '_ {
1159        self.row_ids_slice().iter().copied()
1160    }
1161
1162    /// Returns an iterator over the [`RowId`]s of a [`Chunk`], for a given component.
1163    ///
1164    /// This is different than [`Self::row_ids`]: it will only yield `RowId`s for rows at which
1165    /// there is data for the specified `component_descriptor`.
1166    #[inline]
1167    pub fn component_row_ids(
1168        &self,
1169        component_descriptor: &ComponentDescriptor,
1170    ) -> impl Iterator<Item = RowId> + '_ + use<'_> {
1171        let Some(list_array) = self.components.get(component_descriptor) else {
1172            return Either::Left(std::iter::empty());
1173        };
1174
1175        let row_ids = self.row_ids();
1176
1177        if let Some(validity) = list_array.nulls() {
1178            Either::Right(Either::Left(
1179                row_ids
1180                    .enumerate()
1181                    .filter_map(|(i, o)| validity.is_valid(i).then_some(o)),
1182            ))
1183        } else {
1184            Either::Right(Either::Right(row_ids))
1185        }
1186    }
1187
1188    /// Returns the [`RowId`]-range covered by this [`Chunk`].
1189    ///
1190    /// `None` if the chunk `is_empty`.
1191    ///
1192    /// This is O(1) if the chunk is sorted, O(n) otherwise.
1193    #[inline]
1194    pub fn row_id_range(&self) -> Option<(RowId, RowId)> {
1195        if self.is_empty() {
1196            return None;
1197        }
1198
1199        let row_ids = self.row_ids_slice();
1200
1201        #[allow(clippy::unwrap_used)] // checked above
1202        Some(if self.is_sorted() {
1203            (
1204                row_ids.first().copied().unwrap(),
1205                row_ids.last().copied().unwrap(),
1206            )
1207        } else {
1208            (
1209                row_ids.iter().min().copied().unwrap(),
1210                row_ids.iter().max().copied().unwrap(),
1211            )
1212        })
1213    }
1214
1215    #[inline]
1216    pub fn is_static(&self) -> bool {
1217        self.timelines.is_empty()
1218    }
1219
1220    #[inline]
1221    pub fn timelines(&self) -> &IntMap<TimelineName, TimeColumn> {
1222        &self.timelines
1223    }
1224
1225    #[inline]
1226    pub fn component_descriptors(&self) -> impl Iterator<Item = ComponentDescriptor> + '_ {
1227        self.components.keys().cloned()
1228    }
1229
1230    #[inline]
1231    pub fn components(&self) -> &ChunkComponents {
1232        &self.components
1233    }
1234}
1235
1236impl std::fmt::Display for Chunk {
1237    #[inline]
1238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1239        let batch = self.to_record_batch().map_err(|err| {
1240            re_log::error_once!("couldn't display Chunk: {err}");
1241            std::fmt::Error
1242        })?;
1243        re_format_arrow::format_record_batch_with_width(&batch, f.width()).fmt(f)
1244    }
1245}
1246
1247impl TimeColumn {
1248    #[inline]
1249    pub fn timeline(&self) -> &Timeline {
1250        &self.timeline
1251    }
1252
1253    #[inline]
1254    pub fn name(&self) -> &str {
1255        self.timeline.name()
1256    }
1257
1258    #[inline]
1259    pub fn time_range(&self) -> ResolvedTimeRange {
1260        self.time_range
1261    }
1262
1263    #[inline]
1264    pub fn times_buffer(&self) -> &ArrowScalarBuffer<i64> {
1265        &self.times
1266    }
1267
1268    /// Returns an array with the appropriate datatype.
1269    #[inline]
1270    pub fn times_array(&self) -> ArrowArrayRef {
1271        self.timeline.typ().make_arrow_array(self.times.clone())
1272    }
1273
1274    /// All times in a time column are guaranteed not to have the value `i64::MIN`
1275    /// (which is reserved for static data).
1276    #[inline]
1277    pub fn times_raw(&self) -> &[i64] {
1278        self.times.as_ref()
1279    }
1280
1281    /// All times in a time column are guaranteed not to have the value `i64::MIN`
1282    /// (which is reserved for static data).
1283    #[inline]
1284    pub fn times_nonmin(&self) -> impl DoubleEndedIterator<Item = NonMinI64> + '_ {
1285        self.times_raw()
1286            .iter()
1287            .copied()
1288            .map(NonMinI64::saturating_from_i64)
1289    }
1290
1291    #[inline]
1292    pub fn times(&self) -> impl DoubleEndedIterator<Item = TimeInt> + '_ {
1293        self.times_raw().iter().copied().map(TimeInt::new_temporal)
1294    }
1295
1296    #[inline]
1297    pub fn num_rows(&self) -> usize {
1298        self.times.len()
1299    }
1300
1301    #[inline]
1302    pub fn is_empty(&self) -> bool {
1303        self.num_rows() == 0
1304    }
1305
1306    /// Computes the time range covered by each individual component column.
1307    ///
1308    /// This is different from the time range covered by the [`TimeColumn`] as a whole
1309    /// because component columns are potentially sparse.
1310    ///
1311    /// This is crucial for indexing and queries to work properly.
1312    //
1313    // TODO(cmc): This needs to be stored in chunk metadata and transported across IPC.
1314    pub fn time_range_per_component(
1315        &self,
1316        components: &ChunkComponents,
1317    ) -> IntMap<ComponentDescriptor, ResolvedTimeRange> {
1318        let times = self.times_raw();
1319        components
1320            .iter()
1321            .filter_map(|(component_desc, list_array)| {
1322                if let Some(validity) = list_array.nulls() {
1323                    // Potentially sparse
1324
1325                    if validity.is_empty() {
1326                        return None;
1327                    }
1328
1329                    let is_dense = validity.null_count() == 0;
1330                    if is_dense {
1331                        return Some((component_desc.clone(), self.time_range));
1332                    }
1333
1334                    let mut time_min = TimeInt::MAX;
1335                    for (i, time) in times.iter().copied().enumerate() {
1336                        if validity.is_valid(i) {
1337                            time_min = TimeInt::new_temporal(time);
1338                            break;
1339                        }
1340                    }
1341
1342                    let mut time_max = TimeInt::MIN;
1343                    for (i, time) in times.iter().copied().enumerate().rev() {
1344                        if validity.is_valid(i) {
1345                            time_max = TimeInt::new_temporal(time);
1346                            break;
1347                        }
1348                    }
1349
1350                    Some((
1351                        component_desc.clone(),
1352                        ResolvedTimeRange::new(time_min, time_max),
1353                    ))
1354                } else {
1355                    // Dense
1356
1357                    Some((component_desc.clone(), self.time_range))
1358                }
1359            })
1360            .collect()
1361    }
1362}
1363
1364impl re_byte_size::SizeBytes for Chunk {
1365    #[inline]
1366    fn heap_size_bytes(&self) -> u64 {
1367        let Self {
1368            id,
1369            entity_path,
1370            heap_size_bytes,
1371            is_sorted,
1372            row_ids,
1373            timelines,
1374            components,
1375        } = self;
1376
1377        let mut size_bytes = heap_size_bytes.load(Ordering::Relaxed);
1378
1379        if size_bytes == 0 {
1380            size_bytes = id.heap_size_bytes()
1381                + entity_path.heap_size_bytes()
1382                + is_sorted.heap_size_bytes()
1383                + row_ids.heap_size_bytes()
1384                + timelines.heap_size_bytes()
1385                + components.heap_size_bytes();
1386            heap_size_bytes.store(size_bytes, Ordering::Relaxed);
1387        }
1388
1389        size_bytes
1390    }
1391}
1392
1393impl re_byte_size::SizeBytes for TimeColumn {
1394    #[inline]
1395    fn heap_size_bytes(&self) -> u64 {
1396        let Self {
1397            timeline,
1398            times,
1399            is_sorted,
1400            time_range,
1401        } = self;
1402
1403        timeline.heap_size_bytes()
1404            + times.heap_size_bytes() // cheap
1405            + is_sorted.heap_size_bytes()
1406            + time_range.heap_size_bytes()
1407    }
1408}
1409
1410// --- Sanity checks ---
1411
1412impl Chunk {
1413    /// Returns an error if the Chunk's invariants are not upheld.
1414    ///
1415    /// Costly checks are only run in debug builds.
1416    #[track_caller]
1417    pub fn sanity_check(&self) -> ChunkResult<()> {
1418        re_tracing::profile_function!();
1419
1420        let Self {
1421            id: _,
1422            entity_path: _,
1423            heap_size_bytes,
1424            is_sorted,
1425            row_ids,
1426            timelines,
1427            components,
1428        } = self;
1429
1430        #[allow(clippy::collapsible_if)] // readability
1431        if cfg!(debug_assertions) {
1432            let measured = self.heap_size_bytes();
1433            let advertised = heap_size_bytes.load(Ordering::Relaxed);
1434            if advertised != measured {
1435                return Err(ChunkError::Malformed {
1436                    reason: format!(
1437                        "Chunk advertises a heap size of {} but we measure {} instead",
1438                        re_format::format_bytes(advertised as _),
1439                        re_format::format_bytes(measured as _),
1440                    ),
1441                });
1442            }
1443        }
1444
1445        // Row IDs
1446        {
1447            if *row_ids.data_type() != RowId::arrow_datatype() {
1448                return Err(ChunkError::Malformed {
1449                    reason: format!(
1450                        "RowId data has the wrong datatype: expected {:?} but got {:?} instead",
1451                        RowId::arrow_datatype(),
1452                        *row_ids.data_type(),
1453                    ),
1454                });
1455            }
1456
1457            #[allow(clippy::collapsible_if)] // readability
1458            if cfg!(debug_assertions) {
1459                if *is_sorted != self.is_sorted_uncached() {
1460                    return Err(ChunkError::Malformed {
1461                        reason: format!(
1462                            "Chunk is marked as {}sorted but isn't: {row_ids:?}",
1463                            if *is_sorted { "" } else { "un" },
1464                        ),
1465                    });
1466                }
1467            }
1468        }
1469
1470        // Timelines
1471        for (timeline_name, time_column) in timelines {
1472            if time_column.times.len() != row_ids.len() {
1473                return Err(ChunkError::Malformed {
1474                    reason: format!(
1475                        "All timelines in a chunk must have the same number of timestamps, matching the number of row IDs. \
1476                         Found {} row IDs but {} timestamps for timeline '{timeline_name}'",
1477                        row_ids.len(),
1478                        time_column.times.len(),
1479                    ),
1480                });
1481            }
1482
1483            time_column.sanity_check()?;
1484        }
1485
1486        // Components
1487
1488        for (component_desc, list_array) in components.iter() {
1489            if let Some(c) = component_desc.component_type {
1490                c.sanity_check();
1491            }
1492            // Ensure that each cell is a list (we don't support mono-components yet).
1493            if let arrow::datatypes::DataType::List(_field) = list_array.data_type() {
1494                // We don't check `field.is_nullable()` here because we support both.
1495                // TODO(#6819): Remove support for inner nullability.
1496            } else {
1497                return Err(ChunkError::Malformed {
1498                    reason: format!(
1499                        "The inner array in a chunked component batch must be a list, got {:?}",
1500                        list_array.data_type(),
1501                    ),
1502                });
1503            }
1504
1505            if list_array.len() != row_ids.len() {
1506                return Err(ChunkError::Malformed {
1507                    reason: format!(
1508                        "All component batches in a chunk must have the same number of rows, matching the number of row IDs. \
1509                             Found {} row IDs but {} rows for component batch {component_desc}",
1510                        row_ids.len(),
1511                        list_array.len(),
1512                    ),
1513                });
1514            }
1515
1516            let validity_is_empty = list_array
1517                .nulls()
1518                .is_some_and(|validity| validity.is_empty());
1519            if !self.is_empty() && validity_is_empty {
1520                return Err(ChunkError::Malformed {
1521                    reason: format!(
1522                        "All component batches in a chunk must contain at least one non-null entry.\
1523                             Found a completely empty column for {component_desc}",
1524                    ),
1525                });
1526            }
1527        }
1528
1529        Ok(())
1530    }
1531}
1532
1533impl TimeColumn {
1534    /// Returns an error if the Chunk's invariants are not upheld.
1535    ///
1536    /// Costly checks are only run in debug builds.
1537    #[track_caller]
1538    pub fn sanity_check(&self) -> ChunkResult<()> {
1539        let Self {
1540            timeline: _,
1541            times,
1542            is_sorted,
1543            time_range,
1544        } = self;
1545
1546        let times = times.as_ref();
1547
1548        #[allow(clippy::collapsible_if)] // readability
1549        if cfg!(debug_assertions) {
1550            if *is_sorted != times.windows(2).all(|times| times[0] <= times[1]) {
1551                return Err(ChunkError::Malformed {
1552                    reason: format!(
1553                        "Time column is marked as {}sorted but isn't: {times:?}",
1554                        if *is_sorted { "" } else { "un" },
1555                    ),
1556                });
1557            }
1558        }
1559
1560        #[allow(clippy::collapsible_if)] // readability
1561        if cfg!(debug_assertions) {
1562            let is_tight_lower_bound = times.iter().any(|&time| time == time_range.min().as_i64());
1563            let is_tight_upper_bound = times.iter().any(|&time| time == time_range.max().as_i64());
1564            let is_tight_bound = is_tight_lower_bound && is_tight_upper_bound;
1565
1566            if !self.is_empty() && !is_tight_bound {
1567                return Err(ChunkError::Malformed {
1568                    reason: "Time column's cached time range isn't a tight bound.".to_owned(),
1569                });
1570            }
1571
1572            for &time in times {
1573                if time < time_range.min().as_i64() || time > time_range.max().as_i64() {
1574                    return Err(ChunkError::Malformed {
1575                        reason: format!(
1576                            "Time column's cached time range is wrong.\
1577                             Found a time value of {time} while its time range is {time_range:?}",
1578                        ),
1579                    });
1580                }
1581
1582                if time == TimeInt::STATIC.as_i64() {
1583                    return Err(ChunkError::Malformed {
1584                        reason: "A chunk's timeline should never contain a static time value."
1585                            .to_owned(),
1586                    });
1587                }
1588            }
1589        }
1590
1591        Ok(())
1592    }
1593}