re_chunk/
slice.rs

1use arrow::array::{
2    Array as _, ArrayRef as ArrowArrayRef, BooleanArray as ArrowBooleanArray,
3    ListArray as ArrowListArray,
4};
5use itertools::Itertools as _;
6use nohash_hasher::IntSet;
7use re_log_types::TimelineName;
8use re_types_core::{ComponentIdentifier, SerializedComponentColumn};
9
10use crate::{Chunk, RowId, TimeColumn};
11
12// ---
13
14// NOTE: Not worth writing tests for all of these, until some subtle bug comes up.
15// Most of them are indirectly stressed by our higher-level query tests anyhow.
16
17impl Chunk {
18    /// Returns the cell corresponding to the specified [`RowId`] for a given [`re_types_core::ComponentIdentifier`].
19    ///
20    /// This is `O(log(n))` if `self.is_sorted()`, and `O(n)` otherwise.
21    ///
22    /// Reminder: duplicated `RowId`s results in undefined behavior.
23    pub fn cell(&self, row_id: RowId, component: ComponentIdentifier) -> Option<ArrowArrayRef> {
24        let list_array = self.components.get_array(component)?;
25
26        if self.is_sorted() {
27            let row_ids = self.row_ids_slice();
28            let index = row_ids.binary_search(&row_id).ok()?;
29            list_array.is_valid(index).then(|| list_array.value(index))
30        } else {
31            let (index, _) = self.row_ids().find_position(|id| *id == row_id)?;
32            list_array.is_valid(index).then(|| list_array.value(index))
33        }
34    }
35
36    /// Shallow-slices the [`Chunk`] vertically.
37    ///
38    /// The result is a new [`Chunk`] with the same columns and (potentially) less rows.
39    ///
40    /// This cannot fail nor panic: `index` and `len` will be capped so that they cannot
41    /// run out of bounds.
42    /// This can result in an empty [`Chunk`] being returned if the slice is completely OOB.
43    ///
44    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
45    ///
46    /// ## When to use shallow vs. deep slicing?
47    ///
48    /// This operation is shallow and therefore always O(1), which implicitly means that it cannot
49    /// ever modify the values of the offsets themselves.
50    /// Since the offsets are left untouched, the original unsliced data must always be kept around
51    /// too, _even if the sliced data were to be written to disk_.
52    /// Similarly, the byte sizes reported by e.g. `Chunk::heap_size_bytes` might not always make intuitive
53    /// sense, and should be used very carefully.
54    ///
55    /// For these reasons, shallow slicing should only be used in the context of short-term, in-memory storage
56    /// (e.g. when slicing the results of a query).
57    /// When slicing data for long-term storage, whether in-memory or on disk, see [`Self::row_sliced_deep`] instead.
58    #[must_use]
59    pub fn row_sliced_shallow(&self, index: usize, len: usize) -> Self {
60        let deep = false;
61        self.row_sliced_impl(index, len, deep)
62    }
63
64    /// Deep-slices the [`Chunk`] vertically.
65    ///
66    /// The result is a new [`Chunk`] with the same columns and (potentially) less rows.
67    ///
68    /// This cannot fail nor panic: `index` and `len` will be capped so that they cannot
69    /// run out of bounds.
70    /// This can result in an empty [`Chunk`] being returned if the slice is completely OOB.
71    ///
72    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
73    ///
74    /// ## When to use shallow vs. deep slicing?
75    ///
76    /// This operation is deep and therefore always O(N).
77    ///
78    /// The underlying data, offsets, bitmaps and other buffers required will be reallocated, copied around,
79    /// and patched as much as required so that the resulting physical data becomes as packed as possible for
80    /// the desired slice.
81    /// Similarly, the byte sizes reported by e.g. `Chunk::heap_size_bytes` should always match intuitive expectations.
82    ///
83    /// These characteristics make deep slicing very useful for longer term data, whether it's stored
84    /// in-memory (e.g. in a `ChunkStore`), or on disk.
85    /// When slicing data for short-term needs (e.g. slicing the results of a query) prefer [`Self::row_sliced_shallow`] instead.
86    #[must_use]
87    pub fn row_sliced_deep(&self, index: usize, len: usize) -> Self {
88        let deep = true;
89        self.row_sliced_impl(index, len, deep)
90    }
91
92    #[must_use]
93    fn row_sliced_impl(&self, index: usize, len: usize, deep: bool) -> Self {
94        re_tracing::profile_function!(if deep { "deep" } else { "shallow" });
95
96        let Self {
97            id,
98            entity_path,
99            heap_size_bytes: _,
100            is_sorted,
101            row_ids,
102            timelines,
103            components,
104        } = self;
105
106        // NOTE: Bound checking costs are completely dwarfed by everything else, and preventing the
107        // viewer from crashing is more important than anything else in any case.
108
109        if index >= self.num_rows() {
110            return self.emptied();
111        }
112
113        let end_offset = usize::min(index.saturating_add(len), self.num_rows());
114        let len = end_offset.saturating_sub(index);
115
116        if len == 0 {
117            return self.emptied();
118        }
119
120        let is_sorted = *is_sorted || (len < 2);
121
122        let mut chunk = Self {
123            id: *id,
124            entity_path: entity_path.clone(),
125            heap_size_bytes: Default::default(),
126            is_sorted,
127            row_ids: if deep {
128                re_arrow_util::deep_slice_array(row_ids, index, len)
129            } else {
130                row_ids.slice(index, len)
131            },
132            timelines: timelines
133                .iter()
134                .map(|(timeline, time_column)| (*timeline, time_column.row_sliced(index, len)))
135                .collect(),
136            components: components
137                .values()
138                .map(|column| {
139                    SerializedComponentColumn::new(
140                        if deep {
141                            re_arrow_util::deep_slice_array(&column.list_array, index, len)
142                        } else {
143                            column.list_array.slice(index, len)
144                        },
145                        column.descriptor.clone(),
146                    )
147                })
148                .collect(),
149        };
150
151        // We can know for sure whether the resulting chunk is already sorted (see conditional
152        // above), but the reverse is not true.
153        //
154        // Consider e.g. slicing the following chunk on `(1..=3)`:
155        // ┌──────────────┬───────────────────┬────────────────────────────────────────────┐
156        // │ frame        ┆ example.MyColor   ┆ example.MyPoint                            │
157        // ╞══════════════╪═══════════════════╪════════════════════════════════════════════╡
158        // │ 3            ┆ [4278255873]      ┆ -                                          │
159        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
160        // │ 1            ┆ -                 ┆ [{x: 1, y: 1}, {x: 2, y: 2}]               │
161        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
162        // │ 2            ┆ -                 ┆ [{x: 1, y: 1}, {x: 2, y: 2}]               │
163        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
164        // │ 3            ┆ -                 ┆ [{x: 1, y: 1}, {x: 2, y: 2}]               │
165        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
166        // │ 5            ┆ -                 ┆ [{x: 3, y: 3}, {x: 4, y: 4}, {x: 5, y: 5}] │
167        // └──────────────┴───────────────────┴────────────────────────────────────────────┘
168        //
169        // The original chunk is unsorted, but the new sliced one actually ends up being sorted.
170        chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
171
172        #[cfg(debug_assertions)]
173        #[expect(clippy::unwrap_used)] // debug-only
174        chunk.sanity_check().unwrap();
175
176        chunk
177    }
178
179    /// Slices the [`Chunk`] horizontally by keeping only the selected `timeline`.
180    ///
181    /// The result is a new [`Chunk`] with the same rows and (at-most) one timeline column.
182    /// All non-timeline columns will be kept as-is.
183    ///
184    /// If `timeline` is not found within the [`Chunk`], the end result will be the same as the
185    /// current chunk but without any timeline column.
186    ///
187    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
188    #[must_use]
189    #[inline]
190    pub fn timeline_sliced(&self, timeline: TimelineName) -> Self {
191        let Self {
192            id,
193            entity_path,
194            heap_size_bytes: _,
195            is_sorted,
196            row_ids,
197            timelines,
198            components,
199        } = self;
200
201        let chunk = Self {
202            id: *id,
203            entity_path: entity_path.clone(),
204            heap_size_bytes: Default::default(),
205            is_sorted: *is_sorted,
206            row_ids: row_ids.clone(),
207            timelines: timelines
208                .get_key_value(&timeline)
209                .map(|(timeline, time_column)| (*timeline, time_column.clone()))
210                .into_iter()
211                .collect(),
212            components: components.clone(),
213        };
214
215        #[cfg(debug_assertions)]
216        #[expect(clippy::unwrap_used)] // debug-only
217        chunk.sanity_check().unwrap();
218
219        chunk
220    }
221
222    /// Slices the [`Chunk`] horizontally by keeping only the selected `component`.
223    ///
224    /// The result is a new [`Chunk`] with the same rows and (at-most) one component column.
225    /// All non-component columns will be kept as-is.
226    ///
227    /// If `component` is not found within the [`Chunk`], the end result will be the same as the
228    /// current chunk but without any component column.
229    ///
230    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
231    #[must_use]
232    #[inline]
233    pub fn component_sliced(&self, component: ComponentIdentifier) -> Self {
234        let Self {
235            id,
236            entity_path,
237            heap_size_bytes: _,
238            is_sorted,
239            row_ids,
240            timelines,
241            components,
242        } = self;
243
244        let chunk = Self {
245            id: *id,
246            entity_path: entity_path.clone(),
247            heap_size_bytes: Default::default(),
248            is_sorted: *is_sorted,
249            row_ids: row_ids.clone(),
250            timelines: timelines.clone(),
251            components: components
252                .get(component)
253                .map(|column| {
254                    SerializedComponentColumn::new(
255                        column.list_array.clone(),
256                        column.descriptor.clone(),
257                    )
258                })
259                .into_iter()
260                .collect(),
261        };
262
263        #[cfg(debug_assertions)]
264        #[expect(clippy::unwrap_used)] // debug-only
265        chunk.sanity_check().unwrap();
266
267        chunk
268    }
269
270    /// Slices the [`Chunk`] horizontally by keeping only the selected timelines.
271    ///
272    /// The result is a new [`Chunk`] with the same rows and (at-most) the selected timeline columns.
273    /// All non-timeline columns will be kept as-is.
274    ///
275    /// If none of the selected timelines exist in the [`Chunk`], the end result will be the same as the
276    /// current chunk but without any timeline column.
277    ///
278    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
279    #[must_use]
280    #[inline]
281    pub fn timelines_sliced(&self, timelines_to_keep: &IntSet<TimelineName>) -> Self {
282        let Self {
283            id,
284            entity_path,
285            heap_size_bytes: _,
286            is_sorted,
287            row_ids,
288            timelines,
289            components,
290        } = self;
291
292        let chunk = Self {
293            id: *id,
294            entity_path: entity_path.clone(),
295            heap_size_bytes: Default::default(),
296            is_sorted: *is_sorted,
297            row_ids: row_ids.clone(),
298            timelines: timelines
299                .iter()
300                .filter(|(timeline, _)| timelines_to_keep.contains(timeline))
301                .map(|(timeline, time_column)| (*timeline, time_column.clone()))
302                .collect(),
303            components: components.clone(),
304        };
305
306        #[cfg(debug_assertions)]
307        #[expect(clippy::unwrap_used)] // debug-only
308        chunk.sanity_check().unwrap();
309
310        chunk
311    }
312
313    /// Densifies the [`Chunk`] vertically based on the `componentiptor` column.
314    ///
315    /// Densifying here means dropping all rows where the associated value in the `componentiptor`
316    /// column is null.
317    ///
318    /// The result is a new [`Chunk`] where the `componentiptor` column is guaranteed to be dense.
319    ///
320    /// If `componentiptor` doesn't exist in this [`Chunk`], or if it is already dense, this method
321    /// is a no-op.
322    ///
323    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
324    #[must_use]
325    #[inline]
326    pub fn densified(&self, component_pov: ComponentIdentifier) -> Self {
327        let Self {
328            id,
329            entity_path,
330            heap_size_bytes: _,
331            is_sorted,
332            row_ids,
333            timelines,
334            components,
335        } = self;
336
337        if self.is_empty() {
338            return self.clone();
339        }
340
341        let Some(component_list_array) = self.components.get_array(component_pov) else {
342            return self.clone();
343        };
344
345        let Some(validity) = component_list_array.nulls() else {
346            return self.clone();
347        };
348
349        re_tracing::profile_function!();
350
351        let mask = validity.iter().collect_vec();
352        let is_sorted = *is_sorted || (mask.iter().filter(|&&b| b).count() < 2);
353        let validity_filter = ArrowBooleanArray::from(mask);
354
355        let mut chunk = Self {
356            id: *id,
357            entity_path: entity_path.clone(),
358            heap_size_bytes: Default::default(),
359            is_sorted,
360            row_ids: re_arrow_util::filter_array(row_ids, &validity_filter),
361            timelines: timelines
362                .iter()
363                .map(|(&timeline, time_column)| (timeline, time_column.filtered(&validity_filter)))
364                .collect(),
365            components: components
366                .values()
367                .map(|column| {
368                    let filtered =
369                        re_arrow_util::filter_array(&column.list_array, &validity_filter);
370                    let filtered = if column.descriptor.component == component_pov {
371                        // Make sure we fully remove the validity bitmap for the densified
372                        // component.
373                        // This will allow further operations on this densified chunk to take some
374                        // very optimized paths.
375                        let (field, offsets, values, _nulls) = filtered.into_parts();
376                        ArrowListArray::new(field, offsets, values, None)
377                    } else {
378                        filtered
379                    };
380
381                    SerializedComponentColumn::new(filtered, column.descriptor.clone())
382                })
383                .collect(),
384        };
385
386        // We can know for sure whether the resulting chunk is already sorted (see conditional
387        // above), but the reverse is not true.
388        //
389        // Consider e.g. densifying the following chunk on `example.MyPoint`:
390        // ┌──────────────┬───────────────────┬────────────────────────────────────────────┐
391        // │ frame        ┆ example.MyColor   ┆ example.MyPoint                            │
392        // ╞══════════════╪═══════════════════╪════════════════════════════════════════════╡
393        // │ 3            ┆ [4278255873]      ┆ -                                          │
394        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
395        // │ 1            ┆ -                 ┆ [{x: 1, y: 1}, {x: 2, y: 2}]               │
396        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
397        // │ 5            ┆ -                 ┆ [{x: 3, y: 3}, {x: 4, y: 4}, {x: 5, y: 5}] │
398        // └──────────────┴───────────────────┴────────────────────────────────────────────┘
399        //
400        // The original chunk is unsorted, but the new filtered one actually ends up being sorted.
401        chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
402
403        #[cfg(debug_assertions)]
404        #[expect(clippy::unwrap_used)] // debug-only
405        chunk.sanity_check().unwrap();
406
407        chunk
408    }
409
410    /// Empties the [`Chunk`] vertically.
411    ///
412    /// The result is a new [`Chunk`] with the same columns but zero rows.
413    ///
414    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
415    #[must_use]
416    #[inline]
417    pub fn emptied(&self) -> Self {
418        let Self {
419            id,
420            entity_path,
421            heap_size_bytes: _,
422            is_sorted: _,
423            row_ids: _,
424            timelines,
425            components,
426        } = self;
427
428        re_tracing::profile_function!();
429
430        Self {
431            id: *id,
432            entity_path: entity_path.clone(),
433            heap_size_bytes: Default::default(),
434            is_sorted: true,
435            row_ids: RowId::arrow_from_slice(&[]),
436            timelines: timelines
437                .iter()
438                .map(|(&timeline, time_column)| (timeline, time_column.emptied()))
439                .collect(),
440            components: components
441                .values()
442                .map(|column| {
443                    let field = match column.list_array.data_type() {
444                        arrow::datatypes::DataType::List(field) => field.clone(),
445                        _ => unreachable!("This is always s list array"),
446                    };
447                    SerializedComponentColumn::new(
448                        ArrowListArray::new_null(field, 0),
449                        column.descriptor.clone(),
450                    )
451                })
452                .collect(),
453        }
454    }
455
456    /// Removes all component columns from the [`Chunk`].
457    ///
458    /// The result is a new [`Chunk`] with the same number of rows and the same index columns, but
459    /// no components.
460    ///
461    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
462    #[must_use]
463    #[inline]
464    pub fn components_removed(self) -> Self {
465        let Self {
466            id,
467            entity_path,
468            heap_size_bytes: _,
469            is_sorted,
470            row_ids,
471            timelines,
472            components: _,
473        } = self;
474
475        Self {
476            id,
477            entity_path,
478            heap_size_bytes: Default::default(), // (!) lazily recompute
479            is_sorted,
480            row_ids,
481            timelines,
482            components: Default::default(),
483        }
484    }
485
486    /// Removes duplicate rows from sections of consecutive identical indices.
487    ///
488    /// * If the [`Chunk`] is sorted on that index, the remaining values in the index column will be unique.
489    /// * If the [`Chunk`] has been densified on a specific column, the resulting chunk will
490    ///   effectively contain the latest value of that column for each given index value.
491    ///
492    /// If this is a temporal chunk and `timeline` isn't present in it, this method is a no-op.
493    ///
494    /// This does _not_ obey `RowId`-ordering semantics (or any other kind of semantics for that
495    /// matter) -- it merely respects how the chunk is currently laid out: no more, no less.
496    /// Sort the chunk according to the semantics you're looking for before calling this method.
497    //
498    // TODO(cmc): `Timeline` should really be `Index`.
499    #[inline]
500    pub fn deduped_latest_on_index(&self, index: &TimelineName) -> Self {
501        re_tracing::profile_function!();
502
503        if self.is_empty() {
504            return self.clone();
505        }
506
507        if self.is_static() {
508            return self.row_sliced_shallow(self.num_rows().saturating_sub(1), 1);
509        }
510
511        let Some(time_column) = self.timelines.get(index) else {
512            return self.clone();
513        };
514
515        let indices = {
516            let mut i = 0;
517            let indices = time_column
518                .times_raw()
519                .iter()
520                .copied()
521                .dedup_with_count()
522                .map(|(count, _time)| {
523                    i += count;
524
525                    // input is a 32-bit array, so this can't overflow/wrap
526                    #[expect(clippy::cast_possible_wrap)]
527                    {
528                        i.saturating_sub(1) as i32
529                    }
530                })
531                .collect_vec();
532            arrow::array::Int32Array::from(indices)
533        };
534
535        let chunk = Self {
536            id: self.id,
537            entity_path: self.entity_path.clone(),
538            heap_size_bytes: Default::default(),
539            is_sorted: self.is_sorted,
540            row_ids: re_arrow_util::take_array(
541                &self.row_ids,
542                &arrow::array::Int32Array::from(indices.clone()),
543            ),
544            timelines: self
545                .timelines
546                .iter()
547                .map(|(&timeline, time_column)| (timeline, time_column.taken(&indices)))
548                .collect(),
549            components: self
550                .components
551                .values()
552                .map(|column| {
553                    let filtered = re_arrow_util::take_array(&column.list_array, &indices);
554                    SerializedComponentColumn::new(filtered, column.descriptor.clone())
555                })
556                .collect(),
557        };
558
559        #[cfg(debug_assertions)]
560        #[expect(clippy::unwrap_used)] // debug-only
561        {
562            chunk.sanity_check().unwrap();
563        }
564
565        chunk
566    }
567
568    /// Applies a [filter] kernel to the [`Chunk`] as a whole.
569    ///
570    /// Returns `None` if the length of the filter does not match the number of rows in the chunk.
571    ///
572    /// In release builds, filters are allowed to have null entries (they will be interpreted as `false`).
573    /// In debug builds, null entries will panic.
574    ///
575    /// Note: a `filter` kernel _copies_ the data in order to make the resulting arrays contiguous in memory.
576    ///
577    /// [filter]: arrow::compute::kernels::filter
578    ///
579    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
580    #[must_use]
581    #[inline]
582    pub fn filtered(&self, filter: &ArrowBooleanArray) -> Option<Self> {
583        let Self {
584            id,
585            entity_path,
586            heap_size_bytes: _,
587            is_sorted,
588            row_ids,
589            timelines,
590            components,
591        } = self;
592
593        // Safe early out to prevent panics in upstream kernel implementations.
594        if filter.len() != self.num_rows() {
595            return None;
596        }
597
598        if self.is_empty() {
599            return Some(self.clone());
600        }
601
602        let num_filtered = filter.values().iter().filter(|&b| b).count();
603        if num_filtered == 0 {
604            return Some(self.emptied());
605        }
606
607        re_tracing::profile_function!();
608
609        let is_sorted = *is_sorted || num_filtered < 2;
610
611        let mut chunk = Self {
612            id: *id,
613            entity_path: entity_path.clone(),
614            heap_size_bytes: Default::default(),
615            is_sorted,
616            row_ids: re_arrow_util::filter_array(row_ids, filter),
617            timelines: timelines
618                .iter()
619                .map(|(&timeline, time_column)| (timeline, time_column.filtered(filter)))
620                .collect(),
621            components: components
622                .values()
623                .map(|column| {
624                    let filtered = re_arrow_util::filter_array(&column.list_array, filter);
625                    SerializedComponentColumn::new(filtered, column.descriptor.clone())
626                })
627                .collect(),
628        };
629
630        // We can know for sure whether the resulting chunk is already sorted (see conditional
631        // above), but the reverse is not true.
632        //
633        // Consider e.g. densifying the following chunk on `example.MyPoint`:
634        // ┌──────────────┬───────────────────┬────────────────────────────────────────────┐
635        // │ frame        ┆ example.MyColor   ┆ example.MyPoint                            │
636        // ╞══════════════╪═══════════════════╪════════════════════════════════════════════╡
637        // │ 3            ┆ [4278255873]      ┆ -                                          │
638        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
639        // │ 1            ┆ -                 ┆ [{x: 1, y: 1}, {x: 2, y: 2}]               │
640        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
641        // │ 5            ┆ -                 ┆ [{x: 3, y: 3}, {x: 4, y: 4}, {x: 5, y: 5}] │
642        // └──────────────┴───────────────────┴────────────────────────────────────────────┘
643        //
644        // The original chunk is unsorted, but the new filtered one actually ends up being sorted.
645        chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
646
647        #[cfg(debug_assertions)]
648        #[expect(clippy::unwrap_used)] // debug-only
649        chunk.sanity_check().unwrap();
650
651        Some(chunk)
652    }
653
654    /// Applies a [take] kernel to the [`Chunk`] as a whole.
655    ///
656    /// In release builds, indices are allowed to have null entries (they will be taken as `null`s).
657    /// In debug builds, null entries will panic.
658    ///
659    /// Note: a `take` kernel _copies_ the data in order to make the resulting arrays contiguous in memory.
660    ///
661    /// Takes care of up- and down-casting the data back and forth on behalf of the caller.
662    ///
663    /// [take]: arrow::compute::kernels::take
664    ///
665    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
666    #[must_use]
667    #[inline]
668    pub fn taken(&self, indices: &arrow::array::Int32Array) -> Self {
669        let Self {
670            id,
671            entity_path,
672            heap_size_bytes: _,
673            is_sorted,
674            row_ids,
675            timelines,
676            components,
677        } = self;
678
679        if self.is_empty() {
680            return self.clone();
681        }
682
683        if indices.is_empty() {
684            return self.emptied();
685        }
686
687        re_tracing::profile_function!();
688
689        let is_sorted = *is_sorted || (indices.len() < 2);
690
691        let mut chunk = Self {
692            id: *id,
693            entity_path: entity_path.clone(),
694            heap_size_bytes: Default::default(),
695            is_sorted,
696            row_ids: re_arrow_util::take_array(
697                row_ids,
698                &arrow::array::Int32Array::from(indices.clone()),
699            ),
700            timelines: timelines
701                .iter()
702                .map(|(&timeline, time_column)| (timeline, time_column.taken(indices)))
703                .collect(),
704            components: components
705                .values()
706                .map(|column| {
707                    let taken = re_arrow_util::take_array(&column.list_array, indices);
708                    SerializedComponentColumn::new(taken, column.descriptor.clone())
709                })
710                .collect(),
711        };
712
713        // We can know for sure whether the resulting chunk is already sorted (see conditional
714        // above), but the reverse is not true.
715        //
716        // Consider e.g. densifying the following chunk on `example.MyPoint`:
717        // ┌──────────────┬───────────────────┬────────────────────────────────────────────┐
718        // │ frame        ┆ example.MyColor   ┆ example.MyPoint                            │
719        // ╞══════════════╪═══════════════════╪════════════════════════════════════════════╡
720        // │ 3            ┆ [4278255873]      ┆ -                                          │
721        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
722        // │ 1            ┆ -                 ┆ [{x: 1, y: 1}, {x: 2, y: 2}]               │
723        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
724        // │ 5            ┆ -                 ┆ [{x: 3, y: 3}, {x: 4, y: 4}, {x: 5, y: 5}] │
725        // └──────────────┴───────────────────┴────────────────────────────────────────────┘
726        //
727        // The original chunk is unsorted, but the new filtered one actually ends up being sorted.
728        chunk.is_sorted = is_sorted || chunk.is_sorted_uncached();
729
730        #[cfg(debug_assertions)]
731        #[expect(clippy::unwrap_used)] // debug-only
732        chunk.sanity_check().unwrap();
733
734        chunk
735    }
736}
737
738impl TimeColumn {
739    /// Slices the [`TimeColumn`] vertically.
740    ///
741    /// The result is a new [`TimeColumn`] with the same timelines and (potentially) less rows.
742    ///
743    /// This cannot fail nor panic: `index` and `len` will be capped so that they cannot
744    /// run out of bounds.
745    /// This can result in an empty [`TimeColumn`] being returned if the slice is completely OOB.
746    #[inline]
747    pub fn row_sliced(&self, index: usize, len: usize) -> Self {
748        let Self {
749            timeline,
750            times,
751            is_sorted,
752            time_range: _,
753        } = self;
754
755        // NOTE: Bound checking costs are completely dwarfed by everything else, and preventing the
756        // viewer from crashing is more important than anything else in any case.
757
758        if index >= self.num_rows() {
759            return self.emptied();
760        }
761
762        let end_offset = usize::min(index.saturating_add(len), self.num_rows());
763        let len = end_offset.saturating_sub(index);
764
765        if len == 0 {
766            return self.emptied();
767        }
768
769        let is_sorted = *is_sorted || (len < 2);
770
771        // We can know for sure whether the resulting chunk is already sorted (see conditional
772        // above), but the reverse is not true.
773        //
774        // Consider e.g. slicing the following chunk on `(1..=3)`:
775        // ┌──────────────┬───────────────────┬────────────────────────────────────────────┐
776        // │ frame        ┆ example.MyColor   ┆ example.MyPoint                            │
777        // ╞══════════════╪═══════════════════╪════════════════════════════════════════════╡
778        // │ 3            ┆ [4278255873]      ┆ -                                          │
779        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
780        // │ 1            ┆ -                 ┆ [{x: 1, y: 1}, {x: 2, y: 2}]               │
781        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
782        // │ 2            ┆ -                 ┆ [{x: 1, y: 1}, {x: 2, y: 2}]               │
783        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
784        // │ 3            ┆ -                 ┆ [{x: 1, y: 1}, {x: 2, y: 2}]               │
785        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
786        // │ 5            ┆ -                 ┆ [{x: 3, y: 3}, {x: 4, y: 4}, {x: 5, y: 5}] │
787        // └──────────────┴───────────────────┴────────────────────────────────────────────┘
788        //
789        // The original chunk is unsorted, but the new sliced one actually ends up being sorted.
790        let is_sorted_opt = is_sorted.then_some(is_sorted);
791
792        Self::new(is_sorted_opt, *timeline, times.clone().slice(index, len))
793    }
794
795    /// Empties the [`TimeColumn`] vertically.
796    ///
797    /// The result is a new [`TimeColumn`] with the same columns but zero rows.
798    #[inline]
799    pub fn emptied(&self) -> Self {
800        let Self {
801            timeline,
802            times: _,
803            is_sorted: _,
804            time_range: _,
805        } = self;
806
807        Self::new(Some(true), *timeline, vec![].into())
808    }
809
810    /// Runs a [filter] compute kernel on the time data with the specified `mask`.
811    ///
812    /// [filter]: arrow::compute::kernels::filter
813    #[inline]
814    pub(crate) fn filtered(&self, filter: &ArrowBooleanArray) -> Self {
815        let Self {
816            timeline,
817            times,
818            is_sorted,
819            time_range: _,
820        } = self;
821
822        let is_sorted = *is_sorted || filter.values().iter().filter(|&b| b).count() < 2;
823
824        // We can know for sure whether the resulting chunk is already sorted (see conditional
825        // above), but the reverse is not true.
826        //
827        // Consider e.g. densifying the following chunk on `example.MyPoint`:
828        // ┌──────────────┬───────────────────┬────────────────────────────────────────────┐
829        // │ frame        ┆ example.MyColor   ┆ example.MyPoint                            │
830        // ╞══════════════╪═══════════════════╪════════════════════════════════════════════╡
831        // │ 3            ┆ [4278255873]      ┆ -                                          │
832        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
833        // │ 1            ┆ -                 ┆ [{x: 1, y: 1}, {x: 2, y: 2}]               │
834        // ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
835        // │ 5            ┆ -                 ┆ [{x: 3, y: 3}, {x: 4, y: 4}, {x: 5, y: 5}] │
836        // └──────────────┴───────────────────┴────────────────────────────────────────────┘
837        //
838        // The original chunk is unsorted, but the new filtered one actually ends up being sorted.
839        let is_sorted_opt = is_sorted.then_some(is_sorted);
840
841        Self::new(
842            is_sorted_opt,
843            *timeline,
844            re_arrow_util::filter_array(
845                &arrow::array::Int64Array::new(times.clone(), None),
846                filter,
847            )
848            .into_parts()
849            .1,
850        )
851    }
852
853    /// Runs a [take] compute kernel on the time data with the specified `indices`.
854    ///
855    /// [take]: arrow::compute::take
856    #[inline]
857    pub(crate) fn taken(&self, indices: &arrow::array::Int32Array) -> Self {
858        let Self {
859            timeline,
860            times,
861            is_sorted,
862            time_range: _,
863        } = self;
864
865        let new_times = re_arrow_util::take_array(
866            &arrow::array::Int64Array::new(times.clone(), None),
867            &arrow::array::Int32Array::from(indices.clone()),
868        )
869        .into_parts()
870        .1;
871
872        Self::new(Some(*is_sorted), *timeline, new_times)
873    }
874}
875
876// ---
877
878#[cfg(test)]
879mod tests {
880    #![expect(clippy::cast_possible_wrap)]
881
882    use itertools::Itertools as _;
883    use re_log_types::TimePoint;
884    use re_log_types::example_components::{MyColor, MyLabel, MyPoint, MyPoints};
885
886    use super::*;
887    use crate::{Chunk, RowId, Timeline};
888
889    #[test]
890    fn cell() -> anyhow::Result<()> {
891        let mypoints_points_component = MyPoints::descriptor_points().component;
892        let mypoints_colors_component = MyPoints::descriptor_colors().component;
893        let mypoints_labels_component = MyPoints::descriptor_labels().component;
894
895        let entity_path = "my/entity";
896
897        let row_id1 = RowId::ZERO.incremented_by(10);
898        let row_id2 = RowId::ZERO.incremented_by(20);
899        let row_id3 = RowId::ZERO.incremented_by(30);
900        let row_id4 = RowId::new();
901        let row_id5 = RowId::new();
902
903        let timepoint1 = [
904            (Timeline::log_time(), 1000),
905            (Timeline::new_sequence("frame"), 1),
906        ];
907        let timepoint2 = [
908            (Timeline::log_time(), 1032),
909            (Timeline::new_sequence("frame"), 3),
910        ];
911        let timepoint3 = [
912            (Timeline::log_time(), 1064),
913            (Timeline::new_sequence("frame"), 5),
914        ];
915        let timepoint4 = [
916            (Timeline::log_time(), 1096),
917            (Timeline::new_sequence("frame"), 7),
918        ];
919        let timepoint5 = [
920            (Timeline::log_time(), 1128),
921            (Timeline::new_sequence("frame"), 9),
922        ];
923
924        let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
925        let points3 = &[MyPoint::new(6.0, 7.0)];
926
927        let colors4 = &[MyColor::from_rgb(1, 1, 1)];
928        let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
929
930        let labels1 = &[MyLabel("a".into())];
931        let labels2 = &[MyLabel("b".into())];
932        let labels3 = &[MyLabel("c".into())];
933        let labels4 = &[MyLabel("d".into())];
934        let labels5 = &[MyLabel("e".into())];
935
936        let mut chunk = Chunk::builder(entity_path)
937            .with_sparse_component_batches(
938                row_id2,
939                timepoint4,
940                [
941                    (MyPoints::descriptor_points(), None),
942                    (MyPoints::descriptor_colors(), Some(colors4 as _)),
943                    (MyPoints::descriptor_labels(), Some(labels4 as _)),
944                ],
945            )
946            .with_sparse_component_batches(
947                row_id5,
948                timepoint5,
949                [
950                    (MyPoints::descriptor_points(), None),
951                    (MyPoints::descriptor_colors(), Some(colors5 as _)),
952                    (MyPoints::descriptor_labels(), Some(labels5 as _)),
953                ],
954            )
955            .with_sparse_component_batches(
956                row_id1,
957                timepoint3,
958                [
959                    (MyPoints::descriptor_points(), Some(points1 as _)),
960                    (MyPoints::descriptor_colors(), None),
961                    (MyPoints::descriptor_labels(), Some(labels1 as _)),
962                ],
963            )
964            .with_sparse_component_batches(
965                row_id4,
966                timepoint2,
967                [
968                    (MyPoints::descriptor_points(), None),
969                    (MyPoints::descriptor_colors(), None),
970                    (MyPoints::descriptor_labels(), Some(labels2 as _)),
971                ],
972            )
973            .with_sparse_component_batches(
974                row_id3,
975                timepoint1,
976                [
977                    (MyPoints::descriptor_points(), Some(points3 as _)),
978                    (MyPoints::descriptor_colors(), None),
979                    (MyPoints::descriptor_labels(), Some(labels3 as _)),
980                ],
981            )
982            .build()?;
983
984        eprintln!("chunk:\n{chunk}");
985
986        let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
987            (row_id1, mypoints_points_component, Some(points1 as _)),
988            (row_id2, mypoints_labels_component, Some(labels4 as _)),
989            (row_id3, mypoints_colors_component, None),
990            (row_id4, mypoints_labels_component, Some(labels2 as _)),
991            (row_id5, mypoints_colors_component, Some(colors5 as _)),
992        ];
993
994        assert!(!chunk.is_sorted());
995        for (row_id, component, expected) in expectations {
996            let expected = expected
997                .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
998            eprintln!("{component} @ {row_id}");
999            similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1000        }
1001
1002        chunk.sort_if_unsorted();
1003        assert!(chunk.is_sorted());
1004
1005        for (row_id, component, expected) in expectations {
1006            let expected = expected
1007                .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1008            eprintln!("{component} @ {row_id}");
1009            similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1010        }
1011
1012        Ok(())
1013    }
1014
1015    #[test]
1016    fn dedupe_temporal() -> anyhow::Result<()> {
1017        let mypoints_points_component = MyPoints::descriptor_points().component;
1018        let mypoints_colors_component = MyPoints::descriptor_colors().component;
1019        let mypoints_labels_component = MyPoints::descriptor_labels().component;
1020
1021        let entity_path = "my/entity";
1022
1023        let row_id1 = RowId::new();
1024        let row_id2 = RowId::new();
1025        let row_id3 = RowId::new();
1026        let row_id4 = RowId::new();
1027        let row_id5 = RowId::new();
1028
1029        let timepoint1 = [
1030            (Timeline::log_time(), 1000),
1031            (Timeline::new_sequence("frame"), 1),
1032        ];
1033        let timepoint2 = [
1034            (Timeline::log_time(), 1032),
1035            (Timeline::new_sequence("frame"), 1),
1036        ];
1037        let timepoint3 = [
1038            (Timeline::log_time(), 1064),
1039            (Timeline::new_sequence("frame"), 1),
1040        ];
1041        let timepoint4 = [
1042            (Timeline::log_time(), 1096),
1043            (Timeline::new_sequence("frame"), 2),
1044        ];
1045        let timepoint5 = [
1046            (Timeline::log_time(), 1128),
1047            (Timeline::new_sequence("frame"), 2),
1048        ];
1049
1050        let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1051        let points3 = &[MyPoint::new(6.0, 7.0)];
1052
1053        let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1054        let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1055
1056        let labels1 = &[MyLabel("a".into())];
1057        let labels2 = &[MyLabel("b".into())];
1058        let labels3 = &[MyLabel("c".into())];
1059        let labels4 = &[MyLabel("d".into())];
1060        let labels5 = &[MyLabel("e".into())];
1061
1062        let chunk = Chunk::builder(entity_path)
1063            .with_sparse_component_batches(
1064                row_id1,
1065                timepoint1,
1066                [
1067                    (MyPoints::descriptor_points(), Some(points1 as _)),
1068                    (MyPoints::descriptor_colors(), None),
1069                    (MyPoints::descriptor_labels(), Some(labels1 as _)),
1070                ],
1071            )
1072            .with_sparse_component_batches(
1073                row_id2,
1074                timepoint2,
1075                [
1076                    (MyPoints::descriptor_points(), None),
1077                    (MyPoints::descriptor_colors(), None),
1078                    (MyPoints::descriptor_labels(), Some(labels2 as _)),
1079                ],
1080            )
1081            .with_sparse_component_batches(
1082                row_id3,
1083                timepoint3,
1084                [
1085                    (MyPoints::descriptor_points(), Some(points3 as _)),
1086                    (MyPoints::descriptor_colors(), None),
1087                    (MyPoints::descriptor_labels(), Some(labels3 as _)),
1088                ],
1089            )
1090            .with_sparse_component_batches(
1091                row_id4,
1092                timepoint4,
1093                [
1094                    (MyPoints::descriptor_points(), None),
1095                    (MyPoints::descriptor_colors(), Some(colors4 as _)),
1096                    (MyPoints::descriptor_labels(), Some(labels4 as _)),
1097                ],
1098            )
1099            .with_sparse_component_batches(
1100                row_id5,
1101                timepoint5,
1102                [
1103                    (MyPoints::descriptor_points(), None),
1104                    (MyPoints::descriptor_colors(), Some(colors5 as _)),
1105                    (MyPoints::descriptor_labels(), Some(labels5 as _)),
1106                ],
1107            )
1108            .build()?;
1109
1110        eprintln!("chunk:\n{chunk}");
1111
1112        {
1113            let got = chunk.deduped_latest_on_index(&TimelineName::new("frame"));
1114            eprintln!("got:\n{got}");
1115            assert_eq!(2, got.num_rows());
1116
1117            let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1118                (row_id3, mypoints_points_component, Some(points3 as _)),
1119                (row_id3, mypoints_colors_component, None),
1120                (row_id3, mypoints_labels_component, Some(labels3 as _)),
1121                //
1122                (row_id5, mypoints_points_component, None),
1123                (row_id5, mypoints_colors_component, Some(colors5 as _)),
1124                (row_id5, mypoints_labels_component, Some(labels5 as _)),
1125            ];
1126
1127            for (row_id, component, expected) in expectations {
1128                let expected = expected
1129                    .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1130                eprintln!("{component} @ {row_id}");
1131                similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1132            }
1133        }
1134
1135        {
1136            let got = chunk.deduped_latest_on_index(&TimelineName::log_time());
1137            eprintln!("got:\n{got}");
1138            assert_eq!(5, got.num_rows());
1139
1140            let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1141                (row_id1, mypoints_points_component, Some(points1 as _)),
1142                (row_id1, mypoints_colors_component, None),
1143                (row_id1, mypoints_labels_component, Some(labels1 as _)),
1144                (row_id2, mypoints_points_component, None),
1145                (row_id2, mypoints_colors_component, None),
1146                (row_id2, mypoints_labels_component, Some(labels2 as _)),
1147                (row_id3, mypoints_points_component, Some(points3 as _)),
1148                (row_id3, mypoints_colors_component, None),
1149                (row_id3, mypoints_labels_component, Some(labels3 as _)),
1150                (row_id4, mypoints_points_component, None),
1151                (row_id4, mypoints_colors_component, Some(colors4 as _)),
1152                (row_id4, mypoints_labels_component, Some(labels4 as _)),
1153                (row_id5, mypoints_points_component, None),
1154                (row_id5, mypoints_colors_component, Some(colors5 as _)),
1155                (row_id5, mypoints_labels_component, Some(labels5 as _)),
1156            ];
1157
1158            for (row_id, component, expected) in expectations {
1159                let expected = expected
1160                    .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1161                eprintln!("{component} @ {row_id}");
1162                similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1163            }
1164        }
1165
1166        Ok(())
1167    }
1168
1169    #[test]
1170    fn dedupe_static() -> anyhow::Result<()> {
1171        let mypoints_points_component = MyPoints::descriptor_points().component;
1172        let mypoints_colors_component = MyPoints::descriptor_colors().component;
1173        let mypoints_labels_component = MyPoints::descriptor_labels().component;
1174
1175        let entity_path = "my/entity";
1176
1177        let row_id1 = RowId::new();
1178        let row_id2 = RowId::new();
1179        let row_id3 = RowId::new();
1180        let row_id4 = RowId::new();
1181        let row_id5 = RowId::new();
1182
1183        let timepoint_static = TimePoint::default();
1184
1185        let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1186        let points3 = &[MyPoint::new(6.0, 7.0)];
1187
1188        let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1189        let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1190
1191        let labels1 = &[MyLabel("a".into())];
1192        let labels2 = &[MyLabel("b".into())];
1193        let labels3 = &[MyLabel("c".into())];
1194        let labels4 = &[MyLabel("d".into())];
1195        let labels5 = &[MyLabel("e".into())];
1196
1197        let chunk = Chunk::builder(entity_path)
1198            .with_sparse_component_batches(
1199                row_id1,
1200                timepoint_static.clone(),
1201                [
1202                    (MyPoints::descriptor_points(), Some(points1 as _)),
1203                    (MyPoints::descriptor_colors(), None),
1204                    (MyPoints::descriptor_labels(), Some(labels1 as _)),
1205                ],
1206            )
1207            .with_sparse_component_batches(
1208                row_id2,
1209                timepoint_static.clone(),
1210                [
1211                    (MyPoints::descriptor_points(), None),
1212                    (MyPoints::descriptor_colors(), None),
1213                    (MyPoints::descriptor_labels(), Some(labels2 as _)),
1214                ],
1215            )
1216            .with_sparse_component_batches(
1217                row_id3,
1218                timepoint_static.clone(),
1219                [
1220                    (MyPoints::descriptor_points(), Some(points3 as _)),
1221                    (MyPoints::descriptor_colors(), None),
1222                    (MyPoints::descriptor_labels(), Some(labels3 as _)),
1223                ],
1224            )
1225            .with_sparse_component_batches(
1226                row_id4,
1227                timepoint_static.clone(),
1228                [
1229                    (MyPoints::descriptor_points(), None),
1230                    (MyPoints::descriptor_colors(), Some(colors4 as _)),
1231                    (MyPoints::descriptor_labels(), Some(labels4 as _)),
1232                ],
1233            )
1234            .with_sparse_component_batches(
1235                row_id5,
1236                timepoint_static.clone(),
1237                [
1238                    (MyPoints::descriptor_points(), None),
1239                    (MyPoints::descriptor_colors(), Some(colors5 as _)),
1240                    (MyPoints::descriptor_labels(), Some(labels5 as _)),
1241                ],
1242            )
1243            .build()?;
1244
1245        eprintln!("chunk:\n{chunk}");
1246
1247        {
1248            let got = chunk.deduped_latest_on_index(&TimelineName::new("frame"));
1249            eprintln!("got:\n{got}");
1250            assert_eq!(1, got.num_rows());
1251
1252            let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1253                (row_id5, mypoints_points_component, None),
1254                (row_id5, mypoints_colors_component, Some(colors5 as _)),
1255                (row_id5, mypoints_labels_component, Some(labels5 as _)),
1256            ];
1257
1258            for (row_id, component, expected) in expectations {
1259                let expected = expected
1260                    .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1261                eprintln!("{component} @ {row_id}");
1262                similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1263            }
1264        }
1265
1266        {
1267            let got = chunk.deduped_latest_on_index(&TimelineName::log_time());
1268            eprintln!("got:\n{got}");
1269            assert_eq!(1, got.num_rows());
1270
1271            let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1272                (row_id5, mypoints_points_component, None),
1273                (row_id5, mypoints_colors_component, Some(colors5 as _)),
1274                (row_id5, mypoints_labels_component, Some(labels5 as _)),
1275            ];
1276
1277            for (row_id, component, expected) in expectations {
1278                let expected = expected
1279                    .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1280                eprintln!("{component} @ {row_id}");
1281                similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1282            }
1283        }
1284
1285        Ok(())
1286    }
1287
1288    #[test]
1289    fn filtered() -> anyhow::Result<()> {
1290        let mypoints_points_component = MyPoints::descriptor_points().component;
1291        let mypoints_colors_component = MyPoints::descriptor_colors().component;
1292        let mypoints_labels_component = MyPoints::descriptor_labels().component;
1293
1294        let entity_path = "my/entity";
1295
1296        let row_id1 = RowId::new();
1297        let row_id2 = RowId::new();
1298        let row_id3 = RowId::new();
1299        let row_id4 = RowId::new();
1300        let row_id5 = RowId::new();
1301
1302        let timepoint1 = [
1303            (Timeline::log_time(), 1000),
1304            (Timeline::new_sequence("frame"), 1),
1305        ];
1306        let timepoint2 = [
1307            (Timeline::log_time(), 1032),
1308            (Timeline::new_sequence("frame"), 1),
1309        ];
1310        let timepoint3 = [
1311            (Timeline::log_time(), 1064),
1312            (Timeline::new_sequence("frame"), 1),
1313        ];
1314        let timepoint4 = [
1315            (Timeline::log_time(), 1096),
1316            (Timeline::new_sequence("frame"), 2),
1317        ];
1318        let timepoint5 = [
1319            (Timeline::log_time(), 1128),
1320            (Timeline::new_sequence("frame"), 2),
1321        ];
1322
1323        let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1324        let points3 = &[MyPoint::new(6.0, 7.0)];
1325
1326        let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1327        let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1328
1329        let labels1 = &[MyLabel("a".into())];
1330        let labels2 = &[MyLabel("b".into())];
1331        let labels3 = &[MyLabel("c".into())];
1332        let labels4 = &[MyLabel("d".into())];
1333        let labels5 = &[MyLabel("e".into())];
1334
1335        let chunk = Chunk::builder(entity_path)
1336            .with_sparse_component_batches(
1337                row_id1,
1338                timepoint1,
1339                [
1340                    (MyPoints::descriptor_points(), Some(points1 as _)),
1341                    (MyPoints::descriptor_colors(), None),
1342                    (MyPoints::descriptor_labels(), Some(labels1 as _)),
1343                ],
1344            )
1345            .with_sparse_component_batches(
1346                row_id2,
1347                timepoint2,
1348                [
1349                    (MyPoints::descriptor_points(), None),
1350                    (MyPoints::descriptor_colors(), None),
1351                    (MyPoints::descriptor_labels(), Some(labels2 as _)),
1352                ],
1353            )
1354            .with_sparse_component_batches(
1355                row_id3,
1356                timepoint3,
1357                [
1358                    (MyPoints::descriptor_points(), Some(points3 as _)),
1359                    (MyPoints::descriptor_colors(), None),
1360                    (MyPoints::descriptor_labels(), Some(labels3 as _)),
1361                ],
1362            )
1363            .with_sparse_component_batches(
1364                row_id4,
1365                timepoint4,
1366                [
1367                    (MyPoints::descriptor_points(), None),
1368                    (MyPoints::descriptor_colors(), Some(colors4 as _)),
1369                    (MyPoints::descriptor_labels(), Some(labels4 as _)),
1370                ],
1371            )
1372            .with_sparse_component_batches(
1373                row_id5,
1374                timepoint5,
1375                [
1376                    (MyPoints::descriptor_points(), None),
1377                    (MyPoints::descriptor_colors(), Some(colors5 as _)),
1378                    (MyPoints::descriptor_labels(), Some(labels5 as _)),
1379                ],
1380            )
1381            .build()?;
1382
1383        eprintln!("chunk:\n{chunk}");
1384
1385        // basic
1386        {
1387            let filter =
1388                ArrowBooleanArray::from((0..chunk.num_rows()).map(|i| i % 2 == 0).collect_vec());
1389            let got = chunk.filtered(&filter).unwrap();
1390            eprintln!("got:\n{got}");
1391            assert_eq!(
1392                filter.values().iter().filter(|&b| b).count(),
1393                got.num_rows()
1394            );
1395
1396            let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1397                (row_id1, mypoints_points_component, Some(points1 as _)),
1398                (row_id1, mypoints_colors_component, None),
1399                (row_id1, mypoints_labels_component, Some(labels1 as _)),
1400                //
1401                (row_id3, mypoints_points_component, Some(points3 as _)),
1402                (row_id3, mypoints_colors_component, None),
1403                (row_id3, mypoints_labels_component, Some(labels3 as _)),
1404                //
1405                (row_id5, mypoints_points_component, None),
1406                (row_id5, mypoints_colors_component, Some(colors5 as _)),
1407                (row_id5, mypoints_labels_component, Some(labels5 as _)),
1408            ];
1409
1410            for (row_id, component, expected) in expectations {
1411                let expected = expected
1412                    .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1413                eprintln!("{component} @ {row_id}");
1414                similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1415            }
1416        }
1417
1418        // shorter
1419        {
1420            let filter = ArrowBooleanArray::from(
1421                (0..chunk.num_rows() / 2).map(|i| i % 2 == 0).collect_vec(),
1422            );
1423            let got = chunk.filtered(&filter);
1424            assert!(got.is_none());
1425        }
1426
1427        // longer
1428        {
1429            let filter = ArrowBooleanArray::from(
1430                (0..chunk.num_rows() * 2).map(|i| i % 2 == 0).collect_vec(),
1431            );
1432            let got = chunk.filtered(&filter);
1433            assert!(got.is_none());
1434        }
1435
1436        Ok(())
1437    }
1438
1439    #[test]
1440    fn taken() -> anyhow::Result<()> {
1441        use arrow::array::Int32Array as ArrowInt32Array;
1442
1443        let mypoints_points_component = MyPoints::descriptor_points().component;
1444        let mypoints_colors_component = MyPoints::descriptor_colors().component;
1445        let mypoints_labels_component = MyPoints::descriptor_labels().component;
1446
1447        let entity_path = "my/entity";
1448
1449        let row_id1 = RowId::new();
1450        let row_id2 = RowId::new();
1451        let row_id3 = RowId::new();
1452        let row_id4 = RowId::new();
1453        let row_id5 = RowId::new();
1454
1455        let timepoint1 = [
1456            (Timeline::log_time(), 1000),
1457            (Timeline::new_sequence("frame"), 1),
1458        ];
1459        let timepoint2 = [
1460            (Timeline::log_time(), 1032),
1461            (Timeline::new_sequence("frame"), 1),
1462        ];
1463        let timepoint3 = [
1464            (Timeline::log_time(), 1064),
1465            (Timeline::new_sequence("frame"), 1),
1466        ];
1467        let timepoint4 = [
1468            (Timeline::log_time(), 1096),
1469            (Timeline::new_sequence("frame"), 2),
1470        ];
1471        let timepoint5 = [
1472            (Timeline::log_time(), 1128),
1473            (Timeline::new_sequence("frame"), 2),
1474        ];
1475
1476        let points1 = &[MyPoint::new(1.0, 1.0), MyPoint::new(2.0, 2.0)];
1477        let points3 = &[MyPoint::new(6.0, 7.0)];
1478
1479        let colors4 = &[MyColor::from_rgb(1, 1, 1)];
1480        let colors5 = &[MyColor::from_rgb(2, 2, 2), MyColor::from_rgb(3, 3, 3)];
1481
1482        let labels1 = &[MyLabel("a".into())];
1483        let labels2 = &[MyLabel("b".into())];
1484        let labels3 = &[MyLabel("c".into())];
1485        let labels4 = &[MyLabel("d".into())];
1486        let labels5 = &[MyLabel("e".into())];
1487
1488        let chunk = Chunk::builder(entity_path)
1489            .with_sparse_component_batches(
1490                row_id1,
1491                timepoint1,
1492                [
1493                    (MyPoints::descriptor_points(), Some(points1 as _)),
1494                    (MyPoints::descriptor_colors(), None),
1495                    (MyPoints::descriptor_labels(), Some(labels1 as _)),
1496                ],
1497            )
1498            .with_sparse_component_batches(
1499                row_id2,
1500                timepoint2,
1501                [
1502                    (MyPoints::descriptor_points(), None),
1503                    (MyPoints::descriptor_colors(), None),
1504                    (MyPoints::descriptor_labels(), Some(labels2 as _)),
1505                ],
1506            )
1507            .with_sparse_component_batches(
1508                row_id3,
1509                timepoint3,
1510                [
1511                    (MyPoints::descriptor_points(), Some(points3 as _)),
1512                    (MyPoints::descriptor_colors(), None),
1513                    (MyPoints::descriptor_labels(), Some(labels3 as _)),
1514                ],
1515            )
1516            .with_sparse_component_batches(
1517                row_id4,
1518                timepoint4,
1519                [
1520                    (MyPoints::descriptor_points(), None),
1521                    (MyPoints::descriptor_colors(), Some(colors4 as _)),
1522                    (MyPoints::descriptor_labels(), Some(labels4 as _)),
1523                ],
1524            )
1525            .with_sparse_component_batches(
1526                row_id5,
1527                timepoint5,
1528                [
1529                    (MyPoints::descriptor_points(), None),
1530                    (MyPoints::descriptor_colors(), Some(colors5 as _)),
1531                    (MyPoints::descriptor_labels(), Some(labels5 as _)),
1532                ],
1533            )
1534            .build()?;
1535
1536        eprintln!("chunk:\n{chunk}");
1537
1538        // basic
1539        {
1540            let indices = ArrowInt32Array::from(
1541                (0..chunk.num_rows() as i32)
1542                    .filter(|i| i % 2 == 0)
1543                    .collect_vec(),
1544            );
1545            let got = chunk.taken(&indices);
1546            eprintln!("got:\n{got}");
1547            assert_eq!(indices.len(), got.num_rows());
1548
1549            let expectations: &[(_, _, Option<&dyn re_types_core::ComponentBatch>)] = &[
1550                (row_id1, mypoints_points_component, Some(points1 as _)),
1551                (row_id1, mypoints_colors_component, None),
1552                (row_id1, mypoints_labels_component, Some(labels1 as _)),
1553                //
1554                (row_id3, mypoints_points_component, Some(points3 as _)),
1555                (row_id3, mypoints_colors_component, None),
1556                (row_id3, mypoints_labels_component, Some(labels3 as _)),
1557                //
1558                (row_id5, mypoints_points_component, None),
1559                (row_id5, mypoints_colors_component, Some(colors5 as _)),
1560                (row_id5, mypoints_labels_component, Some(labels5 as _)),
1561            ];
1562
1563            for (row_id, component, expected) in expectations {
1564                let expected = expected
1565                    .and_then(|expected| re_types_core::ComponentBatch::to_arrow(expected).ok());
1566                eprintln!("{component} @ {row_id}");
1567                similar_asserts::assert_eq!(expected, chunk.cell(*row_id, *component));
1568            }
1569        }
1570
1571        // repeated
1572        {
1573            let indices = ArrowInt32Array::from(
1574                std::iter::repeat_n(2i32, chunk.num_rows() * 2).collect_vec(),
1575            );
1576            let got = chunk.taken(&indices);
1577            eprintln!("got:\n{got}");
1578            assert_eq!(indices.len(), got.num_rows());
1579        }
1580
1581        Ok(())
1582    }
1583
1584    #[test]
1585    fn slice_memory_size_conservation() -> anyhow::Result<()> {
1586        use arrow::array::{ListArray as ArrowListArray, UInt8Array as ArrowUInt8Array};
1587        use arrow::buffer::OffsetBuffer as ArrowOffsetBuffer;
1588        use re_byte_size::SizeBytes as _;
1589        use re_types_core::{ComponentDescriptor, SerializedComponentColumn};
1590
1591        // Create a chunk with 3 rows of raw blob data with different sizes
1592        let entity_path = "test/entity";
1593
1594        let row_id1 = RowId::new();
1595        let row_id2 = RowId::new();
1596        let row_id3 = RowId::new();
1597
1598        // Create blob data of different sizes
1599        let blob_size_1 = 10_000; // 10KB
1600        let blob_size_2 = 20_000; // 20KB
1601        let blob_size_3 = 30_000; // 30KB
1602
1603        let blob_data_1: Vec<u8> = (0..blob_size_1 as u8).cycle().take(blob_size_1).collect();
1604        let blob_data_2: Vec<u8> = (0..blob_size_2 as u8).cycle().take(blob_size_2).collect();
1605        let blob_data_3: Vec<u8> = (0..blob_size_3 as u8).cycle().take(blob_size_3).collect();
1606
1607        // Combine all blob data for the ListArray
1608        let mut all_blob_data: Vec<u8> = Vec::new();
1609        all_blob_data.extend(&blob_data_1);
1610        all_blob_data.extend(&blob_data_2);
1611        all_blob_data.extend(&blob_data_3);
1612
1613        // Create the inner UInt8Array containing all blob data
1614        let values_array = ArrowUInt8Array::from(all_blob_data);
1615
1616        // Create the ListArray
1617        let list_array = ArrowListArray::new(
1618            arrow::datatypes::Field::new("item", arrow::datatypes::DataType::UInt8, false).into(),
1619            ArrowOffsetBuffer::from_lengths([blob_size_1, blob_size_2, blob_size_3]),
1620            std::sync::Arc::new(values_array),
1621            None,
1622        );
1623
1624        // Create component descriptor
1625        let blob_descriptor = ComponentDescriptor::partial("blob");
1626
1627        // Create component column
1628        let component_column = SerializedComponentColumn::new(list_array, blob_descriptor);
1629
1630        // Create the chunk manually with raw component data
1631        let chunk = Chunk::new(
1632            crate::ChunkId::new(),
1633            re_log_types::EntityPath::from(entity_path),
1634            Some(true), // is_sorted
1635            RowId::arrow_from_slice(&[row_id1, row_id2, row_id3]),
1636            std::iter::once((
1637                *Timeline::new_sequence("frame").name(),
1638                crate::TimeColumn::new_sequence("frame", [1, 2, 3]),
1639            ))
1640            .collect(),
1641            std::iter::once(component_column).collect(),
1642        )?;
1643
1644        let original_size = chunk.heap_size_bytes();
1645        eprintln!("Original chunk size: {original_size} bytes");
1646
1647        // Create 3 single-row slices
1648        let slice1 = chunk.row_sliced_deep(0, 1);
1649        let slice2 = chunk.row_sliced_deep(1, 1);
1650        let slice3 = chunk.row_sliced_deep(2, 1);
1651
1652        let slice1_size = slice1.heap_size_bytes();
1653        let slice2_size = slice2.heap_size_bytes();
1654        let slice3_size = slice3.heap_size_bytes();
1655
1656        eprintln!("Slice 1 size: {slice1_size} bytes ({blob_size_1} byte blob)");
1657        eprintln!("Slice 2 size: {slice2_size} bytes ({blob_size_2} byte blob)");
1658        eprintln!("Slice 3 size: {slice3_size} bytes ({blob_size_3} byte blob)");
1659
1660        let total_slice_size = slice1_size + slice2_size + slice3_size;
1661        eprintln!("Total slices size: {total_slice_size} bytes");
1662
1663        // The slices should add up to approximately the original size
1664        // We allow some overhead for metadata duplication (row IDs, timeline data, etc.)
1665        // but the component data should be accurately sliced
1666        let acceptable_overhead = 650; // bytes for metadata overhead (increased for raw arrays)
1667
1668        assert!(
1669            total_slice_size <= original_size + acceptable_overhead,
1670            "Slices total size ({total_slice_size}) should not exceed original size ({original_size}) by more than {acceptable_overhead} bytes of overhead",
1671        );
1672
1673        // Each slice should be proportional to its data size
1674        // The slice with 30KB should be larger than the slice with 10KB
1675        assert!(
1676            slice3_size > slice1_size,
1677            "Slice 3 with {blob_size_3} bytes ({slice3_size} total bytes) should be larger than slice 1 with {blob_size_1} bytes ({slice1_size} total bytes)",
1678        );
1679
1680        assert!(
1681            slice2_size > slice1_size,
1682            "Slice 2 with {blob_size_2} bytes ({slice2_size} total bytes) should be larger than slice 1 with {blob_size_1} bytes ({slice1_size} total bytes)",
1683        );
1684
1685        // Verify that the sliced data actually reflects the expected blob sizes
1686        // The component data size should be roughly proportional to the blob sizes
1687        let size_ratio_3_to_1 = slice3_size as f64 / slice1_size as f64;
1688        let expected_ratio_3_to_1 = blob_size_3 as f64 / blob_size_1 as f64; // 3.0
1689
1690        assert!(
1691            size_ratio_3_to_1 > 2.0 && size_ratio_3_to_1 < 4.0,
1692            "Size ratio between slice 3 and slice 1 ({size_ratio_3_to_1:.2}) should be close to expected blob ratio ({expected_ratio_3_to_1:.2})",
1693        );
1694
1695        eprintln!("✓ Raw arrow array slice memory calculation test passed!");
1696
1697        Ok(())
1698    }
1699}