Skip to main content

re_chunk/
shuffle.rs

1use arrow::array::{Array as ArrowArray, ListArray as ArrowListArray};
2use arrow::buffer::{OffsetBuffer as ArrowOffsets, ScalarBuffer as ArrowScalarBuffer};
3use itertools::Itertools as _;
4use re_log_types::TimelineName;
5
6use crate::{Chunk, TimeColumn};
7
8// ---
9
10impl Chunk {
11    /// Is the chunk currently ascendingly sorted by [`crate::RowId`]?
12    ///
13    /// This is O(1) (cached).
14    ///
15    /// See also [`Self::is_sorted_uncached`].
16    #[inline]
17    pub fn is_sorted(&self) -> bool {
18        self.is_sorted
19    }
20
21    /// For debugging purposes.
22    #[doc(hidden)]
23    #[inline]
24    pub fn is_sorted_uncached(&self) -> bool {
25        re_tracing::profile_function!();
26
27        self.row_ids()
28            .tuple_windows::<(_, _)>()
29            .all(|row_ids| row_ids.0 <= row_ids.1)
30    }
31
32    /// Is the chunk ascendingly sorted by time, for all of its timelines?
33    ///
34    /// This is O(1) (cached).
35    #[inline]
36    pub fn is_time_sorted(&self) -> bool {
37        self.timelines
38            .values()
39            .all(|time_column| time_column.is_sorted())
40    }
41
42    /// Is the chunk ascendingly sorted by time, for a specific timeline?
43    ///
44    /// This is O(1) (cached).
45    ///
46    /// See also [`Self::is_timeline_sorted_uncached`].
47    #[inline]
48    pub fn is_timeline_sorted(&self, timeline: &TimelineName) -> bool {
49        self.is_static()
50            || self
51                .timelines
52                .get(timeline)
53                .is_some_and(|time_column| time_column.is_sorted())
54    }
55
56    /// For debugging purposes.
57    #[doc(hidden)]
58    #[inline]
59    pub fn is_timeline_sorted_uncached(&self, timeline: &TimelineName) -> bool {
60        self.is_static()
61            || self
62                .timelines
63                .get(timeline)
64                .is_some_and(|time_column| time_column.is_sorted_uncached())
65    }
66
67    /// Sort the chunk, if needed.
68    ///
69    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
70    #[inline]
71    pub fn sort_if_unsorted(&mut self) {
72        if self.is_sorted() {
73            return;
74        }
75
76        re_tracing::profile_function!();
77
78        #[cfg(not(target_arch = "wasm32"))]
79        let now = std::time::Instant::now();
80
81        let swaps = {
82            re_tracing::profile_scope!("swaps");
83            let row_ids = self.row_ids_slice();
84            let mut swaps = (0..row_ids.len()).collect::<Vec<_>>();
85            swaps.sort_by_key(|&i| row_ids[i]);
86            swaps
87        };
88
89        self.shuffle_with(&swaps);
90
91        #[cfg(not(target_arch = "wasm32"))]
92        re_log::trace!(
93            entity_path = %self.entity_path,
94            num_rows = self.row_ids.len(),
95            elapsed = ?now.elapsed(),
96            "chunk sorted",
97        );
98
99        #[cfg(debug_assertions)]
100        #[expect(clippy::unwrap_used)] // dev only
101        self.sanity_check().unwrap();
102    }
103
104    /// Returns a new [`Chunk`] that is sorted by `(<timeline>, RowId)`.
105    ///
106    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
107    ///
108    /// This is a no-op if the underlying timeline is already sorted appropriately (happy path).
109    ///
110    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
111    #[must_use]
112    pub fn sorted_by_timeline_if_unsorted(&self, timeline: &TimelineName) -> Self {
113        let mut chunk = self.clone();
114
115        let Some(time_column) = chunk.timelines.get(timeline) else {
116            return chunk;
117        };
118
119        if time_column.is_sorted() {
120            return chunk;
121        }
122
123        re_tracing::profile_function!();
124
125        #[cfg(not(target_arch = "wasm32"))]
126        let now = std::time::Instant::now();
127
128        let swaps = {
129            re_tracing::profile_scope!("swaps");
130            let row_ids = chunk.row_ids_slice();
131            let times = time_column.times_raw().to_vec();
132            let mut swaps = (0..times.len()).collect::<Vec<_>>();
133            swaps.sort_by_key(|&i| (times[i], row_ids[i]));
134            swaps
135        };
136
137        chunk.shuffle_with(&swaps);
138
139        #[cfg(not(target_arch = "wasm32"))]
140        re_log::trace!(
141            entity_path = %chunk.entity_path,
142            num_rows = chunk.row_ids.len(),
143            elapsed = ?now.elapsed(),
144            "chunk sorted",
145        );
146
147        #[cfg(debug_assertions)]
148        #[expect(clippy::unwrap_used)] // dev only
149        chunk.sanity_check().unwrap();
150
151        chunk
152    }
153
154    /// Randomly shuffles the chunk using the given `seed`.
155    ///
156    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
157    #[inline]
158    pub fn shuffle_random(&mut self, seed: u64) {
159        re_tracing::profile_function!();
160
161        #[cfg(not(target_arch = "wasm32"))]
162        let now = std::time::Instant::now();
163
164        use rand::SeedableRng as _;
165        use rand::seq::SliceRandom as _;
166        let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
167
168        let swaps = {
169            re_tracing::profile_scope!("swaps");
170            let mut swaps = (0..self.row_ids.len()).collect::<Vec<_>>();
171            swaps.shuffle(&mut rng);
172            swaps
173        };
174
175        self.shuffle_with(&swaps);
176
177        #[cfg(not(target_arch = "wasm32"))]
178        re_log::trace!(
179            entity_path = %self.entity_path,
180            num_rows = self.row_ids.len(),
181            elapsed = ?now.elapsed(),
182            "chunk shuffled",
183        );
184    }
185
186    /// Shuffle the chunk according to the specified `swaps`.
187    ///
188    /// `swaps` is a slice that maps an implicit destination index to its explicit source index.
189    /// E.g. `swap[0] = 3` means that the entry at index `3` in the original chunk should be move to index `0`.
190    ///
191    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
192    //
193    // TODO(RR-3865): Use arrow's `ListView` to only shuffle offsets instead of the data itself.
194    pub(crate) fn shuffle_with(&mut self, swaps: &[usize]) {
195        re_tracing::profile_function!();
196
197        // Row IDs
198        {
199            re_tracing::profile_scope!("row ids");
200
201            let row_ids = self.row_ids_slice();
202
203            let mut sorted_row_ids = row_ids.to_vec();
204            for (to, from) in swaps.iter().copied().enumerate() {
205                sorted_row_ids[to] = row_ids[from];
206            }
207            self.row_ids = re_types_core::RowId::arrow_from_slice(&sorted_row_ids);
208        }
209
210        let Self {
211            id: _,
212            entity_path: _,
213            heap_size_bytes: _,
214            is_sorted: _,
215            row_ids: _,
216            timelines,
217            components,
218        } = self;
219
220        // Timelines
221        {
222            re_tracing::profile_scope!("timelines");
223
224            for info in timelines.values_mut() {
225                let TimeColumn {
226                    timeline: _,
227                    times,
228                    is_sorted,
229                    time_range: _,
230                } = info;
231
232                let mut sorted = times.to_vec();
233                for (to, from) in swaps.iter().copied().enumerate() {
234                    sorted[to] = times[from];
235                }
236
237                *is_sorted = sorted.windows(2).all(|times| times[0] <= times[1]);
238                *times = ArrowScalarBuffer::from(sorted);
239            }
240        }
241
242        // Components
243        //
244        // Reminder: these are all `ListArray`s.
245        re_tracing::profile_scope!("components (offsets & data)");
246        {
247            for original in components.list_arrays_mut() {
248                let sorted_arrays = swaps
249                    .iter()
250                    .copied()
251                    .map(|from| original.value(from))
252                    .collect_vec();
253                let sorted_arrays = sorted_arrays
254                    .iter()
255                    .map(|array| &**array as &dyn ArrowArray)
256                    .collect_vec();
257
258                let datatype = original.data_type().clone();
259                let offsets =
260                    ArrowOffsets::from_lengths(sorted_arrays.iter().map(|array| array.len()));
261                #[expect(clippy::unwrap_used)] // these are slices of the same outer array
262                let values = re_arrow_util::concat_arrays(&sorted_arrays).unwrap();
263                let validity = original
264                    .nulls()
265                    .map(|validity| swaps.iter().map(|&from| validity.is_valid(from)).collect());
266
267                let field = match datatype {
268                    arrow::datatypes::DataType::List(field) => field.clone(),
269                    _ => unreachable!("This is always s list array"),
270                };
271                *original = ArrowListArray::new(field, offsets, values, validity);
272            }
273        }
274
275        self.is_sorted = self.is_sorted_uncached();
276    }
277}
278
279impl TimeColumn {
280    /// Is the timeline sorted?
281    ///
282    /// This is O(1) (cached).
283    ///
284    /// See also [`Self::is_sorted_uncached`].
285    #[inline]
286    pub fn is_sorted(&self) -> bool {
287        self.is_sorted
288    }
289
290    /// Like [`Self::is_sorted`], but actually checks the entire dataset rather than relying on the
291    /// cached value.
292    ///
293    /// O(n). Useful for tests/debugging, or when you just don't know.
294    ///
295    /// See also [`Self::is_sorted`].
296    #[inline]
297    pub fn is_sorted_uncached(&self) -> bool {
298        re_tracing::profile_function!();
299        self.times_raw()
300            .windows(2)
301            .all(|times| times[0] <= times[1])
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use re_log_types::example_components::{MyColor, MyPoint, MyPoints};
308    use re_log_types::{EntityPath, Timeline};
309
310    use super::*;
311    use crate::{ChunkId, RowId};
312
313    #[test]
314    fn sort() -> anyhow::Result<()> {
315        let entity_path: EntityPath = "a/b/c".into();
316
317        let timeline1 = Timeline::new_duration("log_time");
318        let timeline2 = Timeline::new_sequence("frame_nr");
319
320        let points1 = vec![
321            MyPoint::new(1.0, 2.0),
322            MyPoint::new(3.0, 4.0),
323            MyPoint::new(5.0, 6.0),
324        ];
325        let points3 = vec![MyPoint::new(10.0, 20.0)];
326        let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
327
328        let colors1 = vec![
329            MyColor::from_rgb(1, 2, 3),
330            MyColor::from_rgb(4, 5, 6),
331            MyColor::from_rgb(7, 8, 9),
332        ];
333        let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
334        let colors4 = vec![
335            MyColor::from_rgb(101, 102, 103),
336            MyColor::from_rgb(104, 105, 106),
337        ];
338
339        {
340            let chunk_sorted = Chunk::builder(entity_path.clone())
341                .with_sparse_component_batches(
342                    RowId::new(),
343                    [(timeline1, 1000), (timeline2, 42)],
344                    [
345                        (MyPoints::descriptor_points(), Some(&points1 as _)),
346                        (MyPoints::descriptor_colors(), Some(&colors1 as _)),
347                    ],
348                )
349                .with_sparse_component_batches(
350                    RowId::new(),
351                    [(timeline1, 1001), (timeline2, 43)],
352                    [
353                        (MyPoints::descriptor_points(), None),
354                        (MyPoints::descriptor_colors(), Some(&colors2 as _)),
355                    ],
356                )
357                .with_sparse_component_batches(
358                    RowId::new(),
359                    [(timeline1, 1002), (timeline2, 44)],
360                    [
361                        (MyPoints::descriptor_points(), Some(&points3 as _)),
362                        (MyPoints::descriptor_colors(), None),
363                    ],
364                )
365                .with_sparse_component_batches(
366                    RowId::new(),
367                    [(timeline1, 1003), (timeline2, 45)],
368                    [
369                        (MyPoints::descriptor_points(), Some(&points4 as _)),
370                        (MyPoints::descriptor_colors(), Some(&colors4 as _)),
371                    ],
372                )
373                .build()?;
374
375            eprintln!("{chunk_sorted}");
376
377            assert!(chunk_sorted.is_sorted());
378            assert!(chunk_sorted.is_sorted_uncached());
379
380            let chunk_shuffled = {
381                let mut chunk_shuffled = chunk_sorted.clone();
382                chunk_shuffled.shuffle_random(666);
383                chunk_shuffled
384            };
385
386            eprintln!("{chunk_shuffled}");
387
388            assert!(!chunk_shuffled.is_sorted());
389            assert!(!chunk_shuffled.is_sorted_uncached());
390            assert_ne!(chunk_sorted, chunk_shuffled);
391
392            let chunk_resorted = {
393                let mut chunk_resorted = chunk_shuffled.clone();
394                chunk_resorted.sort_if_unsorted();
395                chunk_resorted
396            };
397
398            eprintln!("{chunk_resorted}");
399
400            assert!(chunk_resorted.is_sorted());
401            assert!(chunk_resorted.is_sorted_uncached());
402            assert_eq!(chunk_sorted, chunk_resorted);
403        }
404
405        Ok(())
406    }
407
408    #[test]
409    fn sort_time() -> anyhow::Result<()> {
410        let entity_path: EntityPath = "a/b/c".into();
411
412        let timeline1 = Timeline::new_duration("log_time");
413        let timeline2 = Timeline::new_sequence("frame_nr");
414
415        let chunk_id = ChunkId::new();
416        let row_id1 = RowId::new();
417        let row_id2 = RowId::new();
418        let row_id3 = RowId::new();
419        let row_id4 = RowId::new();
420
421        let points1 = vec![
422            MyPoint::new(1.0, 2.0),
423            MyPoint::new(3.0, 4.0),
424            MyPoint::new(5.0, 6.0),
425        ];
426        let points3 = vec![MyPoint::new(10.0, 20.0)];
427        let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
428
429        let colors1 = vec![
430            MyColor::from_rgb(1, 2, 3),
431            MyColor::from_rgb(4, 5, 6),
432            MyColor::from_rgb(7, 8, 9),
433        ];
434        let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
435        let colors4 = vec![
436            MyColor::from_rgb(101, 102, 103),
437            MyColor::from_rgb(104, 105, 106),
438        ];
439
440        {
441            let chunk_unsorted_timeline2 = Chunk::builder_with_id(chunk_id, entity_path.clone())
442                .with_sparse_component_batches(
443                    row_id1,
444                    [(timeline1, 1000), (timeline2, 45)],
445                    [
446                        (MyPoints::descriptor_points(), Some(&points1 as _)),
447                        (MyPoints::descriptor_colors(), Some(&colors1 as _)),
448                    ],
449                )
450                .with_sparse_component_batches(
451                    row_id2,
452                    [(timeline1, 1001), (timeline2, 44)],
453                    [
454                        (MyPoints::descriptor_points(), None),
455                        (MyPoints::descriptor_colors(), Some(&colors2 as _)),
456                    ],
457                )
458                .with_sparse_component_batches(
459                    row_id3,
460                    [(timeline1, 1002), (timeline2, 43)],
461                    [
462                        (MyPoints::descriptor_points(), Some(&points3 as _)),
463                        (MyPoints::descriptor_colors(), None),
464                    ],
465                )
466                .with_sparse_component_batches(
467                    row_id4,
468                    [(timeline1, 1003), (timeline2, 42)],
469                    [
470                        (MyPoints::descriptor_points(), Some(&points4 as _)),
471                        (MyPoints::descriptor_colors(), Some(&colors4 as _)),
472                    ],
473                )
474                .build()?;
475
476            eprintln!("unsorted:\n{chunk_unsorted_timeline2}");
477
478            assert!(chunk_unsorted_timeline2.is_sorted());
479            assert!(chunk_unsorted_timeline2.is_sorted_uncached());
480
481            assert!(
482                chunk_unsorted_timeline2
483                    .timelines()
484                    .get(timeline1.name())
485                    .unwrap()
486                    .is_sorted()
487            );
488            assert!(
489                chunk_unsorted_timeline2
490                    .timelines()
491                    .get(timeline1.name())
492                    .unwrap()
493                    .is_sorted_uncached()
494            );
495
496            assert!(
497                !chunk_unsorted_timeline2
498                    .timelines()
499                    .get(timeline2.name())
500                    .unwrap()
501                    .is_sorted()
502            );
503            assert!(
504                !chunk_unsorted_timeline2
505                    .timelines()
506                    .get(timeline2.name())
507                    .unwrap()
508                    .is_sorted_uncached()
509            );
510
511            let chunk_sorted_timeline2 =
512                chunk_unsorted_timeline2.sorted_by_timeline_if_unsorted(timeline2.name());
513
514            eprintln!("sorted:\n{chunk_sorted_timeline2}");
515
516            assert!(!chunk_sorted_timeline2.is_sorted());
517            assert!(!chunk_sorted_timeline2.is_sorted_uncached());
518
519            assert!(
520                !chunk_sorted_timeline2
521                    .timelines()
522                    .get(timeline1.name())
523                    .unwrap()
524                    .is_sorted()
525            );
526            assert!(
527                !chunk_sorted_timeline2
528                    .timelines()
529                    .get(timeline1.name())
530                    .unwrap()
531                    .is_sorted_uncached()
532            );
533
534            assert!(
535                chunk_sorted_timeline2
536                    .timelines()
537                    .get(timeline2.name())
538                    .unwrap()
539                    .is_sorted()
540            );
541            assert!(
542                chunk_sorted_timeline2
543                    .timelines()
544                    .get(timeline2.name())
545                    .unwrap()
546                    .is_sorted_uncached()
547            );
548
549            let chunk_sorted_timeline2_expected =
550                Chunk::builder_with_id(chunk_id, entity_path.clone())
551                    .with_sparse_component_batches(
552                        row_id4,
553                        [(timeline1, 1003), (timeline2, 42)],
554                        [
555                            (MyPoints::descriptor_points(), Some(&points4 as _)),
556                            (MyPoints::descriptor_colors(), Some(&colors4 as _)),
557                        ],
558                    )
559                    .with_sparse_component_batches(
560                        row_id3,
561                        [(timeline1, 1002), (timeline2, 43)],
562                        [
563                            (MyPoints::descriptor_points(), Some(&points3 as _)),
564                            (MyPoints::descriptor_colors(), None),
565                        ],
566                    )
567                    .with_sparse_component_batches(
568                        row_id2,
569                        [(timeline1, 1001), (timeline2, 44)],
570                        [
571                            (MyPoints::descriptor_points(), None),
572                            (MyPoints::descriptor_colors(), Some(&colors2 as _)),
573                        ],
574                    )
575                    .with_sparse_component_batches(
576                        row_id1,
577                        [(timeline1, 1000), (timeline2, 45)],
578                        [
579                            (MyPoints::descriptor_points(), Some(&points1 as _)),
580                            (MyPoints::descriptor_colors(), Some(&colors1 as _)),
581                        ],
582                    )
583                    .build()?;
584
585            eprintln!("expected:\n{chunk_sorted_timeline2}");
586
587            assert_eq!(
588                chunk_sorted_timeline2_expected,
589                chunk_sorted_timeline2,
590                "{}",
591                similar_asserts::SimpleDiff::from_str(
592                    &format!("{chunk_sorted_timeline2_expected}"),
593                    &format!("{chunk_sorted_timeline2}"),
594                    "got",
595                    "expected",
596                ),
597            );
598        }
599
600        Ok(())
601    }
602}