Skip to main content

re_chunk/
shuffle.rs

1use arrow::array::{Array as ArrowArray, ListArray as ArrowListArray};
2use arrow::buffer::{OffsetBuffer as ArrowOffsets, ScalarBuffer as ArrowScalarBuffer};
3use itertools::Itertools as _;
4use re_log_types::TimelineName;
5
6use crate::{Chunk, ChunkId, TimeColumn};
7
8// ---
9
10impl Chunk {
11    /// Is the chunk currently ascendingly sorted by [`crate::RowId`]?
12    ///
13    /// This is O(1) (cached).
14    ///
15    /// See also [`Self::is_sorted_uncached`].
16    #[inline]
17    pub fn is_sorted(&self) -> bool {
18        self.is_sorted
19    }
20
21    /// For debugging purposes.
22    #[doc(hidden)]
23    #[inline]
24    pub fn is_sorted_uncached(&self) -> bool {
25        re_tracing::profile_function!();
26
27        self.row_ids()
28            .tuple_windows::<(_, _)>()
29            .all(|row_ids| row_ids.0 <= row_ids.1)
30    }
31
32    /// Is the chunk ascendingly sorted by time, for all of its timelines?
33    ///
34    /// This is O(1) (cached).
35    #[inline]
36    pub fn is_time_sorted(&self) -> bool {
37        self.timelines
38            .values()
39            .all(|time_column| time_column.is_sorted())
40    }
41
42    /// Is the chunk ascendingly sorted by time, for a specific timeline?
43    ///
44    /// This is O(1) (cached).
45    ///
46    /// See also [`Self::is_timeline_sorted_uncached`].
47    #[inline]
48    pub fn is_timeline_sorted(&self, timeline: &TimelineName) -> bool {
49        self.is_static()
50            || self
51                .timelines
52                .get(timeline)
53                .is_some_and(|time_column| time_column.is_sorted())
54    }
55
56    /// For debugging purposes.
57    #[doc(hidden)]
58    #[inline]
59    pub fn is_timeline_sorted_uncached(&self, timeline: &TimelineName) -> bool {
60        self.is_static()
61            || self
62                .timelines
63                .get(timeline)
64                .is_some_and(|time_column| time_column.is_sorted_uncached())
65    }
66
67    /// Sort the chunk, if needed.
68    ///
69    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
70    ///
71    /// If the chunk changes, it is given a new unique [`ChunkId`].
72    #[inline]
73    pub fn sort_if_unsorted(&mut self) {
74        if self.is_sorted() {
75            return;
76        }
77
78        re_tracing::profile_function!();
79
80        self.id = ChunkId::new();
81
82        #[cfg(not(target_arch = "wasm32"))]
83        let now = std::time::Instant::now();
84
85        let swaps = {
86            re_tracing::profile_scope!("swaps");
87            let row_ids = self.row_ids_slice();
88            let mut swaps = (0..row_ids.len()).collect::<Vec<_>>();
89            swaps.sort_by_key(|&i| row_ids[i]);
90            swaps
91        };
92
93        self.shuffle_with(&swaps);
94
95        #[cfg(not(target_arch = "wasm32"))]
96        re_log::trace!(
97            entity_path = %self.entity_path,
98            num_rows = self.row_ids.len(),
99            elapsed = ?now.elapsed(),
100            "chunk sorted",
101        );
102
103        #[cfg(debug_assertions)]
104        #[expect(clippy::unwrap_used)] // dev only
105        self.sanity_check().unwrap();
106    }
107
108    /// Returns a new [`Chunk`] that is sorted by `(<timeline>, RowId)`.
109    ///
110    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
111    ///
112    /// This is a no-op if the underlying timeline is already sorted appropriately (happy path).
113    ///
114    /// If the chunk is already sorted, the original [`crate::ChunkId`] is preserved.
115    /// Otherwise, the returned chunk gets a new unique [`crate::ChunkId`].
116    #[must_use]
117    pub fn sorted_by_timeline_if_unsorted(&self, timeline: &TimelineName) -> Self {
118        let Some(time_column) = self.timelines.get(timeline) else {
119            return self.clone_with_same_id();
120        };
121
122        if time_column.is_sorted() {
123            return self.clone_with_same_id();
124        }
125
126        let mut chunk = self.clone_with_new_id();
127
128        re_tracing::profile_function!();
129
130        #[cfg(not(target_arch = "wasm32"))]
131        let now = std::time::Instant::now();
132
133        let swaps = {
134            re_tracing::profile_scope!("swaps");
135            let row_ids = chunk.row_ids_slice();
136            let times = time_column.times_raw().to_vec();
137            let mut swaps = (0..times.len()).collect::<Vec<_>>();
138            swaps.sort_by_key(|&i| (times[i], row_ids[i]));
139            swaps
140        };
141
142        chunk.shuffle_with(&swaps);
143
144        #[cfg(not(target_arch = "wasm32"))]
145        re_log::trace!(
146            entity_path = %chunk.entity_path,
147            num_rows = chunk.row_ids.len(),
148            elapsed = ?now.elapsed(),
149            "chunk sorted",
150        );
151
152        #[cfg(debug_assertions)]
153        #[expect(clippy::unwrap_used)] // dev only
154        chunk.sanity_check().unwrap();
155
156        chunk
157    }
158
159    /// Randomly shuffles the chunk using the given `seed`.
160    ///
161    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
162    #[inline]
163    #[cfg(debug_assertions)] // only for tests
164    pub fn shuffle_random(&mut self, seed: u64) {
165        re_tracing::profile_function!();
166
167        #[cfg(not(target_arch = "wasm32"))]
168        let now = std::time::Instant::now();
169
170        use rand::SeedableRng as _;
171        use rand::seq::SliceRandom as _;
172        let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
173
174        let swaps = {
175            re_tracing::profile_scope!("swaps");
176            let mut swaps = (0..self.row_ids.len()).collect::<Vec<_>>();
177            swaps.shuffle(&mut rng);
178            swaps
179        };
180
181        self.shuffle_with(&swaps);
182
183        #[cfg(not(target_arch = "wasm32"))]
184        re_log::trace!(
185            entity_path = %self.entity_path,
186            num_rows = self.row_ids.len(),
187            elapsed = ?now.elapsed(),
188            "chunk shuffled",
189        );
190    }
191
192    /// Shuffle the chunk according to the specified `swaps`.
193    ///
194    /// `swaps` is a slice that maps an implicit destination index to its explicit source index.
195    /// E.g. `swap[0] = 3` means that the entry at index `3` in the original chunk should be move to index `0`.
196    ///
197    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
198    //
199    // TODO(RR-3865): Use arrow's `ListView` to only shuffle offsets instead of the data itself.
200    pub(crate) fn shuffle_with(&mut self, swaps: &[usize]) {
201        re_tracing::profile_function!();
202
203        self.id = ChunkId::new();
204
205        // Row IDs
206        {
207            re_tracing::profile_scope!("row ids");
208
209            let row_ids = self.row_ids_slice();
210
211            let mut sorted_row_ids = row_ids.to_vec();
212            for (to, from) in swaps.iter().copied().enumerate() {
213                sorted_row_ids[to] = row_ids[from];
214            }
215            self.row_ids = re_types_core::RowId::arrow_from_slice(&sorted_row_ids);
216        }
217
218        let Self {
219            id: _,
220            entity_path: _,
221            heap_size_bytes: _,
222            is_sorted: _,
223            row_ids: _,
224            timelines,
225            components,
226        } = self;
227
228        // Timelines
229        {
230            re_tracing::profile_scope!("timelines");
231
232            for info in timelines.values_mut() {
233                let TimeColumn {
234                    timeline: _,
235                    times,
236                    is_sorted,
237                    time_range: _,
238                } = info;
239
240                let mut sorted = times.to_vec();
241                for (to, from) in swaps.iter().copied().enumerate() {
242                    sorted[to] = times[from];
243                }
244
245                *is_sorted = sorted.windows(2).all(|times| times[0] <= times[1]);
246                *times = ArrowScalarBuffer::from(sorted);
247            }
248        }
249
250        // Components
251        //
252        // Reminder: these are all `ListArray`s.
253        re_tracing::profile_scope!("components (offsets & data)");
254        {
255            for original in components.list_arrays_mut() {
256                let sorted_arrays = swaps
257                    .iter()
258                    .copied()
259                    .map(|from| original.value(from))
260                    .collect_vec();
261                let sorted_arrays = sorted_arrays
262                    .iter()
263                    .map(|array| &**array as &dyn ArrowArray)
264                    .collect_vec();
265
266                let datatype = original.data_type().clone();
267                let offsets =
268                    ArrowOffsets::from_lengths(sorted_arrays.iter().map(|array| array.len()));
269                #[expect(clippy::unwrap_used)] // these are slices of the same outer array
270                let values = re_arrow_util::concat_arrays(&sorted_arrays).unwrap();
271                let validity = original
272                    .nulls()
273                    .map(|validity| swaps.iter().map(|&from| validity.is_valid(from)).collect());
274
275                let field = match datatype {
276                    arrow::datatypes::DataType::List(field) => field.clone(),
277                    _ => unreachable!("This is always s list array"),
278                };
279                *original = ArrowListArray::new(field, offsets, values, validity);
280            }
281        }
282
283        self.is_sorted = self.is_sorted_uncached();
284    }
285}
286
287impl TimeColumn {
288    /// Is the timeline sorted?
289    ///
290    /// This is O(1) (cached).
291    ///
292    /// See also [`Self::is_sorted_uncached`].
293    #[inline]
294    pub fn is_sorted(&self) -> bool {
295        self.is_sorted
296    }
297
298    /// Like [`Self::is_sorted`], but actually checks the entire dataset rather than relying on the
299    /// cached value.
300    ///
301    /// O(n). Useful for tests/debugging, or when you just don't know.
302    ///
303    /// See also [`Self::is_sorted`].
304    #[inline]
305    pub fn is_sorted_uncached(&self) -> bool {
306        re_tracing::profile_function!();
307        self.times_raw()
308            .windows(2)
309            .all(|times| times[0] <= times[1])
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use re_log_types::example_components::{MyColor, MyPoint, MyPoints};
316    use re_log_types::{EntityPath, Timeline};
317    use re_types_core::ComponentBatch as _;
318
319    use super::*;
320    use crate::{ChunkId, RowId};
321
322    #[test]
323    fn sort() -> anyhow::Result<()> {
324        let entity_path: EntityPath = "a/b/c".into();
325
326        let timeline1 = Timeline::new_duration("log_time");
327        let timeline2 = Timeline::new_sequence("frame_nr");
328
329        let points1 = vec![
330            MyPoint::new(1.0, 2.0),
331            MyPoint::new(3.0, 4.0),
332            MyPoint::new(5.0, 6.0),
333        ];
334        let points3 = vec![MyPoint::new(10.0, 20.0)];
335        let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
336
337        let colors1 = vec![
338            MyColor::from_rgb(1, 2, 3),
339            MyColor::from_rgb(4, 5, 6),
340            MyColor::from_rgb(7, 8, 9),
341        ];
342        let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
343        let colors4 = vec![
344            MyColor::from_rgb(101, 102, 103),
345            MyColor::from_rgb(104, 105, 106),
346        ];
347
348        {
349            let chunk_sorted = Chunk::builder(entity_path.clone())
350                .with_sparse_component_batches(
351                    RowId::new(),
352                    [(timeline1, 1000), (timeline2, 42)],
353                    [
354                        (MyPoints::descriptor_points(), Some(&points1 as _)),
355                        (MyPoints::descriptor_colors(), Some(&colors1 as _)),
356                    ],
357                )
358                .with_sparse_component_batches(
359                    RowId::new(),
360                    [(timeline1, 1001), (timeline2, 43)],
361                    [
362                        (MyPoints::descriptor_points(), None),
363                        (MyPoints::descriptor_colors(), Some(&colors2 as _)),
364                    ],
365                )
366                .with_sparse_component_batches(
367                    RowId::new(),
368                    [(timeline1, 1002), (timeline2, 44)],
369                    [
370                        (MyPoints::descriptor_points(), Some(&points3 as _)),
371                        (MyPoints::descriptor_colors(), None),
372                    ],
373                )
374                .with_sparse_component_batches(
375                    RowId::new(),
376                    [(timeline1, 1003), (timeline2, 45)],
377                    [
378                        (MyPoints::descriptor_points(), Some(&points4 as _)),
379                        (MyPoints::descriptor_colors(), Some(&colors4 as _)),
380                    ],
381                )
382                .build()?;
383
384            eprintln!("{chunk_sorted}");
385
386            assert!(chunk_sorted.is_sorted());
387            assert!(chunk_sorted.is_sorted_uncached());
388
389            let chunk_shuffled = {
390                let mut chunk_shuffled = chunk_sorted.clone();
391                chunk_shuffled.shuffle_random(666);
392                chunk_shuffled
393            };
394
395            eprintln!("{chunk_shuffled}");
396
397            assert!(!chunk_shuffled.is_sorted());
398            assert!(!chunk_shuffled.is_sorted_uncached());
399            assert_ne!(chunk_sorted, chunk_shuffled);
400
401            let chunk_resorted = {
402                let mut chunk_resorted = chunk_shuffled.clone();
403                chunk_resorted.sort_if_unsorted();
404                chunk_resorted
405            };
406
407            eprintln!("{chunk_resorted}");
408
409            assert!(chunk_resorted.is_sorted());
410            assert!(chunk_resorted.is_sorted_uncached());
411            assert_eq!(chunk_sorted, chunk_resorted);
412        }
413
414        Ok(())
415    }
416
417    #[test]
418    fn sort_time() -> anyhow::Result<()> {
419        let entity_path: EntityPath = "a/b/c".into();
420
421        let timeline1 = Timeline::new_duration("log_time");
422        let timeline2 = Timeline::new_sequence("frame_nr");
423
424        let chunk_id = ChunkId::new();
425        let row_id1 = RowId::new();
426        let row_id2 = RowId::new();
427        let row_id3 = RowId::new();
428        let row_id4 = RowId::new();
429
430        let points1 = vec![
431            MyPoint::new(1.0, 2.0),
432            MyPoint::new(3.0, 4.0),
433            MyPoint::new(5.0, 6.0),
434        ];
435        let points3 = vec![MyPoint::new(10.0, 20.0)];
436        let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
437
438        let colors1 = vec![
439            MyColor::from_rgb(1, 2, 3),
440            MyColor::from_rgb(4, 5, 6),
441            MyColor::from_rgb(7, 8, 9),
442        ];
443        let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
444        let colors4 = vec![
445            MyColor::from_rgb(101, 102, 103),
446            MyColor::from_rgb(104, 105, 106),
447        ];
448
449        {
450            let chunk_unsorted_timeline2 = Chunk::builder_with_id(chunk_id, entity_path.clone())
451                .with_sparse_component_batches(
452                    row_id1,
453                    [(timeline1, 1000), (timeline2, 45)],
454                    [
455                        (MyPoints::descriptor_points(), Some(&points1 as _)),
456                        (MyPoints::descriptor_colors(), Some(&colors1 as _)),
457                    ],
458                )
459                .with_sparse_component_batches(
460                    row_id2,
461                    [(timeline1, 1001), (timeline2, 44)],
462                    [
463                        (MyPoints::descriptor_points(), None),
464                        (MyPoints::descriptor_colors(), Some(&colors2 as _)),
465                    ],
466                )
467                .with_sparse_component_batches(
468                    row_id3,
469                    [(timeline1, 1002), (timeline2, 43)],
470                    [
471                        (MyPoints::descriptor_points(), Some(&points3 as _)),
472                        (MyPoints::descriptor_colors(), None),
473                    ],
474                )
475                .with_sparse_component_batches(
476                    row_id4,
477                    [(timeline1, 1003), (timeline2, 42)],
478                    [
479                        (MyPoints::descriptor_points(), Some(&points4 as _)),
480                        (MyPoints::descriptor_colors(), Some(&colors4 as _)),
481                    ],
482                )
483                .build()?;
484
485            eprintln!("unsorted:\n{chunk_unsorted_timeline2}");
486
487            assert!(chunk_unsorted_timeline2.is_sorted());
488            assert!(chunk_unsorted_timeline2.is_sorted_uncached());
489
490            assert!(
491                chunk_unsorted_timeline2
492                    .timelines()
493                    .get(timeline1.name())
494                    .unwrap()
495                    .is_sorted()
496            );
497            assert!(
498                chunk_unsorted_timeline2
499                    .timelines()
500                    .get(timeline1.name())
501                    .unwrap()
502                    .is_sorted_uncached()
503            );
504
505            assert!(
506                !chunk_unsorted_timeline2
507                    .timelines()
508                    .get(timeline2.name())
509                    .unwrap()
510                    .is_sorted()
511            );
512            assert!(
513                !chunk_unsorted_timeline2
514                    .timelines()
515                    .get(timeline2.name())
516                    .unwrap()
517                    .is_sorted_uncached()
518            );
519
520            let chunk_sorted_timeline2 =
521                chunk_unsorted_timeline2.sorted_by_timeline_if_unsorted(timeline2.name());
522
523            eprintln!("sorted:\n{chunk_sorted_timeline2}");
524
525            assert!(!chunk_sorted_timeline2.is_sorted());
526            assert!(!chunk_sorted_timeline2.is_sorted_uncached());
527
528            assert!(
529                !chunk_sorted_timeline2
530                    .timelines()
531                    .get(timeline1.name())
532                    .unwrap()
533                    .is_sorted()
534            );
535            assert!(
536                !chunk_sorted_timeline2
537                    .timelines()
538                    .get(timeline1.name())
539                    .unwrap()
540                    .is_sorted_uncached()
541            );
542
543            assert!(
544                chunk_sorted_timeline2
545                    .timelines()
546                    .get(timeline2.name())
547                    .unwrap()
548                    .is_sorted()
549            );
550            assert!(
551                chunk_sorted_timeline2
552                    .timelines()
553                    .get(timeline2.name())
554                    .unwrap()
555                    .is_sorted_uncached()
556            );
557
558            let chunk_sorted_timeline2_expected =
559                Chunk::builder_with_id(chunk_id, entity_path.clone())
560                    .with_sparse_component_batches(
561                        row_id4,
562                        [(timeline1, 1003), (timeline2, 42)],
563                        [
564                            (MyPoints::descriptor_points(), Some(&points4 as _)),
565                            (MyPoints::descriptor_colors(), Some(&colors4 as _)),
566                        ],
567                    )
568                    .with_sparse_component_batches(
569                        row_id3,
570                        [(timeline1, 1002), (timeline2, 43)],
571                        [
572                            (MyPoints::descriptor_points(), Some(&points3 as _)),
573                            (MyPoints::descriptor_colors(), None),
574                        ],
575                    )
576                    .with_sparse_component_batches(
577                        row_id2,
578                        [(timeline1, 1001), (timeline2, 44)],
579                        [
580                            (MyPoints::descriptor_points(), None),
581                            (MyPoints::descriptor_colors(), Some(&colors2 as _)),
582                        ],
583                    )
584                    .with_sparse_component_batches(
585                        row_id1,
586                        [(timeline1, 1000), (timeline2, 45)],
587                        [
588                            (MyPoints::descriptor_points(), Some(&points1 as _)),
589                            (MyPoints::descriptor_colors(), Some(&colors1 as _)),
590                        ],
591                    )
592                    .build()?;
593
594            eprintln!("expected:\n{chunk_sorted_timeline2}");
595
596            assert_eq!(
597                chunk_sorted_timeline2_expected,
598                chunk_sorted_timeline2,
599                "{}",
600                similar_asserts::SimpleDiff::from_str(
601                    &format!("{chunk_sorted_timeline2_expected}"),
602                    &format!("{chunk_sorted_timeline2}"),
603                    "got",
604                    "expected",
605                ),
606            );
607        }
608
609        Ok(())
610    }
611
612    /// `Chunk::from_auto_row_ids` should reorder its inputs so that the time columns become
613    /// sorted as well as possible, to avoid "out-of-order" chunks where some time columns are not sorted with respect to `RowId`.
614    #[test]
615    fn from_auto_row_ids_sorts_lexicographically() -> anyhow::Result<()> {
616        let entity_path: EntityPath = "a/b/c".into();
617
618        // Two timelines named such that "alpha" sorts before "beta".
619        // Construct deliberately unsorted inputs:
620        //   row | alpha | beta | color
621        //   ----|-------|------|------
622        //    0  |   2   |   5  |  100
623        //    1  |   1   |   9  |  200
624        //    2  |   1   |   7  |  300
625        //    3  |   2   |   3  |  400
626        //    4  |   1   |   9  |  500   (duplicate of row 1's key — tests stability)
627        //
628        // After lex-sort by (alpha, beta) the expected row order is:
629        //   2 (1,7,300), 1 (1,9,200), 4 (1,9,500), 3 (2,3,400), 0 (2,5,100)
630        let alpha = TimeColumn::new_sequence("alpha", [2_i64, 1, 1, 2, 1]);
631        let beta = TimeColumn::new_sequence("beta", [5_i64, 9, 7, 3, 9]);
632
633        let colors = vec![
634            MyColor(100),
635            MyColor(200),
636            MyColor(300),
637            MyColor(400),
638            MyColor(500),
639        ];
640        let colors_array = colors.to_arrow_list_array()?;
641
642        // `Chunk::from_columns` uses `Chunk::from_auto_row_ids`.
643        let chunk = Chunk::from_columns(
644            entity_path,
645            [alpha, beta],
646            [(MyPoints::descriptor_colors(), colors_array)],
647        )?;
648
649        eprintln!("{chunk}");
650
651        assert!(chunk.is_sorted());
652        assert!(chunk.is_sorted_uncached());
653
654        let alpha = chunk.timelines().get(&"alpha".into()).unwrap();
655        let beta = chunk.timelines().get(&"beta".into()).unwrap();
656
657        // The primary timeline (alphabetically first) must be globally sorted; the secondary
658        // one is only sorted within each primary-key group.
659        assert!(alpha.is_sorted());
660        assert!(!beta.is_sorted());
661
662        assert_eq!(alpha.times_raw().to_vec(), vec![1, 1, 1, 2, 2]);
663        assert_eq!(beta.times_raw().to_vec(), vec![7, 9, 9, 3, 5]);
664
665        // Verify the components were permuted in lockstep with the time columns,
666        // and that ties (rows 1 and 4) preserved their original order (stable sort).
667        let got_colors: Vec<u32> = chunk
668            .iter_slices::<u32>(MyPoints::descriptor_colors().component)
669            .flat_map(<[u32]>::to_vec)
670            .collect();
671        assert_eq!(got_colors, vec![300, 200, 500, 400, 100]);
672
673        // RowIds must be sequential ascending.
674        let row_ids: Vec<_> = chunk.row_ids().collect();
675        for w in row_ids.windows(2) {
676            assert!(w[0] < w[1], "row_ids must be strictly ascending");
677        }
678
679        Ok(())
680    }
681}