re_chunk/
shuffle.rs

1use arrow::{
2    array::{Array as ArrowArray, ListArray as ArrowListArray},
3    buffer::{OffsetBuffer as ArrowOffsets, ScalarBuffer as ArrowScalarBuffer},
4};
5use itertools::Itertools as _;
6
7use re_log_types::TimelineName;
8
9use crate::{Chunk, TimeColumn};
10
11// ---
12
13impl Chunk {
14    /// Is the chunk currently ascendingly sorted by [`crate::RowId`]?
15    ///
16    /// This is O(1) (cached).
17    ///
18    /// See also [`Self::is_sorted_uncached`].
19    #[inline]
20    pub fn is_sorted(&self) -> bool {
21        self.is_sorted
22    }
23
24    /// For debugging purposes.
25    #[doc(hidden)]
26    #[inline]
27    pub fn is_sorted_uncached(&self) -> bool {
28        re_tracing::profile_function!();
29
30        self.row_ids()
31            .tuple_windows::<(_, _)>()
32            .all(|row_ids| row_ids.0 <= row_ids.1)
33    }
34
35    /// Is the chunk ascendingly sorted by time, for all of its timelines?
36    ///
37    /// This is O(1) (cached).
38    #[inline]
39    pub fn is_time_sorted(&self) -> bool {
40        self.timelines
41            .values()
42            .all(|time_column| time_column.is_sorted())
43    }
44
45    /// Is the chunk ascendingly sorted by time, for a specific timeline?
46    ///
47    /// This is O(1) (cached).
48    ///
49    /// See also [`Self::is_timeline_sorted_uncached`].
50    #[inline]
51    pub fn is_timeline_sorted(&self, timeline: &TimelineName) -> bool {
52        self.is_static()
53            || self
54                .timelines
55                .get(timeline)
56                .is_some_and(|time_column| time_column.is_sorted())
57    }
58
59    /// For debugging purposes.
60    #[doc(hidden)]
61    #[inline]
62    pub fn is_timeline_sorted_uncached(&self, timeline: &TimelineName) -> bool {
63        self.is_static()
64            || self
65                .timelines
66                .get(timeline)
67                .is_some_and(|time_column| time_column.is_sorted_uncached())
68    }
69
70    /// Sort the chunk, if needed.
71    ///
72    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
73    #[inline]
74    pub fn sort_if_unsorted(&mut self) {
75        if self.is_sorted() {
76            return;
77        }
78
79        re_tracing::profile_function!();
80
81        #[cfg(not(target_arch = "wasm32"))]
82        let now = std::time::Instant::now();
83
84        let swaps = {
85            re_tracing::profile_scope!("swaps");
86            let row_ids = self.row_ids_slice();
87            let mut swaps = (0..row_ids.len()).collect::<Vec<_>>();
88            swaps.sort_by_key(|&i| row_ids[i]);
89            swaps
90        };
91
92        self.shuffle_with(&swaps);
93
94        #[cfg(not(target_arch = "wasm32"))]
95        re_log::trace!(
96            entity_path = %self.entity_path,
97            num_rows = self.row_ids.len(),
98            elapsed = ?now.elapsed(),
99            "chunk sorted",
100        );
101
102        #[cfg(debug_assertions)]
103        #[expect(clippy::unwrap_used)] // dev only
104        self.sanity_check().unwrap();
105    }
106
107    /// Returns a new [`Chunk`] that is sorted by `(<timeline>, RowId)`.
108    ///
109    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
110    ///
111    /// This is a no-op if the underlying timeline is already sorted appropriately (happy path).
112    ///
113    /// WARNING: the returned chunk has the same old [`crate::ChunkId`]! Change it with [`Self::with_id`].
114    #[must_use]
115    pub fn sorted_by_timeline_if_unsorted(&self, timeline: &TimelineName) -> Self {
116        let mut chunk = self.clone();
117
118        let Some(time_column) = chunk.timelines.get(timeline) else {
119            return chunk;
120        };
121
122        if time_column.is_sorted() {
123            return chunk;
124        }
125
126        re_tracing::profile_function!();
127
128        #[cfg(not(target_arch = "wasm32"))]
129        let now = std::time::Instant::now();
130
131        let swaps = {
132            re_tracing::profile_scope!("swaps");
133            let row_ids = chunk.row_ids_slice();
134            let times = time_column.times_raw().to_vec();
135            let mut swaps = (0..times.len()).collect::<Vec<_>>();
136            swaps.sort_by_key(|&i| (times[i], row_ids[i]));
137            swaps
138        };
139
140        chunk.shuffle_with(&swaps);
141
142        #[cfg(not(target_arch = "wasm32"))]
143        re_log::trace!(
144            entity_path = %chunk.entity_path,
145            num_rows = chunk.row_ids.len(),
146            elapsed = ?now.elapsed(),
147            "chunk sorted",
148        );
149
150        #[cfg(debug_assertions)]
151        #[expect(clippy::unwrap_used)] // dev only
152        chunk.sanity_check().unwrap();
153
154        chunk
155    }
156
157    /// Randomly shuffles the chunk using the given `seed`.
158    ///
159    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
160    #[inline]
161    pub fn shuffle_random(&mut self, seed: u64) {
162        re_tracing::profile_function!();
163
164        #[cfg(not(target_arch = "wasm32"))]
165        let now = std::time::Instant::now();
166
167        use rand::{SeedableRng as _, seq::SliceRandom as _};
168        let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
169
170        let swaps = {
171            re_tracing::profile_scope!("swaps");
172            let mut swaps = (0..self.row_ids.len()).collect::<Vec<_>>();
173            swaps.shuffle(&mut rng);
174            swaps
175        };
176
177        self.shuffle_with(&swaps);
178
179        #[cfg(not(target_arch = "wasm32"))]
180        re_log::trace!(
181            entity_path = %self.entity_path,
182            num_rows = self.row_ids.len(),
183            elapsed = ?now.elapsed(),
184            "chunk shuffled",
185        );
186    }
187
188    /// Shuffle the chunk according to the specified `swaps`.
189    ///
190    /// `swaps` is a slice that maps an implicit destination index to its explicit source index.
191    /// E.g. `swap[0] = 3` means that the entry at index `3` in the original chunk should be move to index `0`.
192    ///
193    /// The underlying arrow data will be copied and shuffled in memory in order to make it contiguous.
194    //
195    // TODO(apache/arrow-rs#5375): Provide a path that only shuffles offsets instead of the data itself, using a `ListView`.
196    pub(crate) fn shuffle_with(&mut self, swaps: &[usize]) {
197        re_tracing::profile_function!();
198
199        // Row IDs
200        {
201            re_tracing::profile_scope!("row ids");
202
203            let row_ids = self.row_ids_slice();
204
205            let mut sorted_row_ids = row_ids.to_vec();
206            for (to, from) in swaps.iter().copied().enumerate() {
207                sorted_row_ids[to] = row_ids[from];
208            }
209            self.row_ids = re_types_core::RowId::arrow_from_slice(&sorted_row_ids);
210        }
211
212        let Self {
213            id: _,
214            entity_path: _,
215            heap_size_bytes: _,
216            is_sorted: _,
217            row_ids: _,
218            timelines,
219            components,
220        } = self;
221
222        // Timelines
223        {
224            re_tracing::profile_scope!("timelines");
225
226            for info in timelines.values_mut() {
227                let TimeColumn {
228                    timeline: _,
229                    times,
230                    is_sorted,
231                    time_range: _,
232                } = info;
233
234                let mut sorted = times.to_vec();
235                for (to, from) in swaps.iter().copied().enumerate() {
236                    sorted[to] = times[from];
237                }
238
239                *is_sorted = sorted.windows(2).all(|times| times[0] <= times[1]);
240                *times = ArrowScalarBuffer::from(sorted);
241            }
242        }
243
244        // Components
245        //
246        // Reminder: these are all `ListArray`s.
247        re_tracing::profile_scope!("components (offsets & data)");
248        {
249            for original in components.values_mut() {
250                let sorted_arrays = swaps
251                    .iter()
252                    .copied()
253                    .map(|from| original.value(from))
254                    .collect_vec();
255                let sorted_arrays = sorted_arrays
256                    .iter()
257                    .map(|array| &**array as &dyn ArrowArray)
258                    .collect_vec();
259
260                let datatype = original.data_type().clone();
261                let offsets =
262                    ArrowOffsets::from_lengths(sorted_arrays.iter().map(|array| array.len()));
263                #[expect(clippy::unwrap_used)] // these are slices of the same outer array
264                let values = re_arrow_util::concat_arrays(&sorted_arrays).unwrap();
265                let validity = original
266                    .nulls()
267                    .map(|validity| swaps.iter().map(|&from| validity.is_valid(from)).collect());
268
269                let field = match datatype {
270                    arrow::datatypes::DataType::List(field) => field.clone(),
271                    _ => unreachable!("This is always s list array"),
272                };
273                *original = ArrowListArray::new(field, offsets, values, validity);
274            }
275        }
276
277        self.is_sorted = self.is_sorted_uncached();
278    }
279}
280
281impl TimeColumn {
282    /// Is the timeline sorted?
283    ///
284    /// This is O(1) (cached).
285    ///
286    /// See also [`Self::is_sorted_uncached`].
287    #[inline]
288    pub fn is_sorted(&self) -> bool {
289        self.is_sorted
290    }
291
292    /// Like [`Self::is_sorted`], but actually checks the entire dataset rather than relying on the
293    /// cached value.
294    ///
295    /// O(n). Useful for tests/debugging, or when you just don't know.
296    ///
297    /// See also [`Self::is_sorted`].
298    #[inline]
299    pub fn is_sorted_uncached(&self) -> bool {
300        re_tracing::profile_function!();
301        self.times_raw()
302            .windows(2)
303            .all(|times| times[0] <= times[1])
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use re_log_types::{
310        EntityPath, Timeline,
311        example_components::{MyColor, MyPoint, MyPoints},
312    };
313
314    use crate::{ChunkId, RowId};
315
316    use super::*;
317
318    #[test]
319    fn sort() -> anyhow::Result<()> {
320        let entity_path: EntityPath = "a/b/c".into();
321
322        let timeline1 = Timeline::new_duration("log_time");
323        let timeline2 = Timeline::new_sequence("frame_nr");
324
325        let points1 = vec![
326            MyPoint::new(1.0, 2.0),
327            MyPoint::new(3.0, 4.0),
328            MyPoint::new(5.0, 6.0),
329        ];
330        let points3 = vec![MyPoint::new(10.0, 20.0)];
331        let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
332
333        let colors1 = vec![
334            MyColor::from_rgb(1, 2, 3),
335            MyColor::from_rgb(4, 5, 6),
336            MyColor::from_rgb(7, 8, 9),
337        ];
338        let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
339        let colors4 = vec![
340            MyColor::from_rgb(101, 102, 103),
341            MyColor::from_rgb(104, 105, 106),
342        ];
343
344        {
345            let chunk_sorted = Chunk::builder(entity_path.clone())
346                .with_sparse_component_batches(
347                    RowId::new(),
348                    [(timeline1, 1000), (timeline2, 42)],
349                    [
350                        (MyPoints::descriptor_points(), Some(&points1 as _)),
351                        (MyPoints::descriptor_colors(), Some(&colors1 as _)),
352                    ],
353                )
354                .with_sparse_component_batches(
355                    RowId::new(),
356                    [(timeline1, 1001), (timeline2, 43)],
357                    [
358                        (MyPoints::descriptor_points(), None),
359                        (MyPoints::descriptor_colors(), Some(&colors2 as _)),
360                    ],
361                )
362                .with_sparse_component_batches(
363                    RowId::new(),
364                    [(timeline1, 1002), (timeline2, 44)],
365                    [
366                        (MyPoints::descriptor_points(), Some(&points3 as _)),
367                        (MyPoints::descriptor_colors(), None),
368                    ],
369                )
370                .with_sparse_component_batches(
371                    RowId::new(),
372                    [(timeline1, 1003), (timeline2, 45)],
373                    [
374                        (MyPoints::descriptor_points(), Some(&points4 as _)),
375                        (MyPoints::descriptor_colors(), Some(&colors4 as _)),
376                    ],
377                )
378                .build()?;
379
380            eprintln!("{chunk_sorted}");
381
382            assert!(chunk_sorted.is_sorted());
383            assert!(chunk_sorted.is_sorted_uncached());
384
385            let chunk_shuffled = {
386                let mut chunk_shuffled = chunk_sorted.clone();
387                chunk_shuffled.shuffle_random(666);
388                chunk_shuffled
389            };
390
391            eprintln!("{chunk_shuffled}");
392
393            assert!(!chunk_shuffled.is_sorted());
394            assert!(!chunk_shuffled.is_sorted_uncached());
395            assert_ne!(chunk_sorted, chunk_shuffled);
396
397            let chunk_resorted = {
398                let mut chunk_resorted = chunk_shuffled.clone();
399                chunk_resorted.sort_if_unsorted();
400                chunk_resorted
401            };
402
403            eprintln!("{chunk_resorted}");
404
405            assert!(chunk_resorted.is_sorted());
406            assert!(chunk_resorted.is_sorted_uncached());
407            assert_eq!(chunk_sorted, chunk_resorted);
408        }
409
410        Ok(())
411    }
412
413    #[test]
414    fn sort_time() -> anyhow::Result<()> {
415        let entity_path: EntityPath = "a/b/c".into();
416
417        let timeline1 = Timeline::new_duration("log_time");
418        let timeline2 = Timeline::new_sequence("frame_nr");
419
420        let chunk_id = ChunkId::new();
421        let row_id1 = RowId::new();
422        let row_id2 = RowId::new();
423        let row_id3 = RowId::new();
424        let row_id4 = RowId::new();
425
426        let points1 = vec![
427            MyPoint::new(1.0, 2.0),
428            MyPoint::new(3.0, 4.0),
429            MyPoint::new(5.0, 6.0),
430        ];
431        let points3 = vec![MyPoint::new(10.0, 20.0)];
432        let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
433
434        let colors1 = vec![
435            MyColor::from_rgb(1, 2, 3),
436            MyColor::from_rgb(4, 5, 6),
437            MyColor::from_rgb(7, 8, 9),
438        ];
439        let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
440        let colors4 = vec![
441            MyColor::from_rgb(101, 102, 103),
442            MyColor::from_rgb(104, 105, 106),
443        ];
444
445        {
446            let chunk_unsorted_timeline2 = Chunk::builder_with_id(chunk_id, entity_path.clone())
447                .with_sparse_component_batches(
448                    row_id1,
449                    [(timeline1, 1000), (timeline2, 45)],
450                    [
451                        (MyPoints::descriptor_points(), Some(&points1 as _)),
452                        (MyPoints::descriptor_colors(), Some(&colors1 as _)),
453                    ],
454                )
455                .with_sparse_component_batches(
456                    row_id2,
457                    [(timeline1, 1001), (timeline2, 44)],
458                    [
459                        (MyPoints::descriptor_points(), None),
460                        (MyPoints::descriptor_colors(), Some(&colors2 as _)),
461                    ],
462                )
463                .with_sparse_component_batches(
464                    row_id3,
465                    [(timeline1, 1002), (timeline2, 43)],
466                    [
467                        (MyPoints::descriptor_points(), Some(&points3 as _)),
468                        (MyPoints::descriptor_colors(), None),
469                    ],
470                )
471                .with_sparse_component_batches(
472                    row_id4,
473                    [(timeline1, 1003), (timeline2, 42)],
474                    [
475                        (MyPoints::descriptor_points(), Some(&points4 as _)),
476                        (MyPoints::descriptor_colors(), Some(&colors4 as _)),
477                    ],
478                )
479                .build()?;
480
481            eprintln!("unsorted:\n{chunk_unsorted_timeline2}");
482
483            assert!(chunk_unsorted_timeline2.is_sorted());
484            assert!(chunk_unsorted_timeline2.is_sorted_uncached());
485
486            assert!(
487                chunk_unsorted_timeline2
488                    .timelines()
489                    .get(timeline1.name())
490                    .unwrap()
491                    .is_sorted()
492            );
493            assert!(
494                chunk_unsorted_timeline2
495                    .timelines()
496                    .get(timeline1.name())
497                    .unwrap()
498                    .is_sorted_uncached()
499            );
500
501            assert!(
502                !chunk_unsorted_timeline2
503                    .timelines()
504                    .get(timeline2.name())
505                    .unwrap()
506                    .is_sorted()
507            );
508            assert!(
509                !chunk_unsorted_timeline2
510                    .timelines()
511                    .get(timeline2.name())
512                    .unwrap()
513                    .is_sorted_uncached()
514            );
515
516            let chunk_sorted_timeline2 =
517                chunk_unsorted_timeline2.sorted_by_timeline_if_unsorted(timeline2.name());
518
519            eprintln!("sorted:\n{chunk_sorted_timeline2}");
520
521            assert!(!chunk_sorted_timeline2.is_sorted());
522            assert!(!chunk_sorted_timeline2.is_sorted_uncached());
523
524            assert!(
525                !chunk_sorted_timeline2
526                    .timelines()
527                    .get(timeline1.name())
528                    .unwrap()
529                    .is_sorted()
530            );
531            assert!(
532                !chunk_sorted_timeline2
533                    .timelines()
534                    .get(timeline1.name())
535                    .unwrap()
536                    .is_sorted_uncached()
537            );
538
539            assert!(
540                chunk_sorted_timeline2
541                    .timelines()
542                    .get(timeline2.name())
543                    .unwrap()
544                    .is_sorted()
545            );
546            assert!(
547                chunk_sorted_timeline2
548                    .timelines()
549                    .get(timeline2.name())
550                    .unwrap()
551                    .is_sorted_uncached()
552            );
553
554            let chunk_sorted_timeline2_expected =
555                Chunk::builder_with_id(chunk_id, entity_path.clone())
556                    .with_sparse_component_batches(
557                        row_id4,
558                        [(timeline1, 1003), (timeline2, 42)],
559                        [
560                            (MyPoints::descriptor_points(), Some(&points4 as _)),
561                            (MyPoints::descriptor_colors(), Some(&colors4 as _)),
562                        ],
563                    )
564                    .with_sparse_component_batches(
565                        row_id3,
566                        [(timeline1, 1002), (timeline2, 43)],
567                        [
568                            (MyPoints::descriptor_points(), Some(&points3 as _)),
569                            (MyPoints::descriptor_colors(), None),
570                        ],
571                    )
572                    .with_sparse_component_batches(
573                        row_id2,
574                        [(timeline1, 1001), (timeline2, 44)],
575                        [
576                            (MyPoints::descriptor_points(), None),
577                            (MyPoints::descriptor_colors(), Some(&colors2 as _)),
578                        ],
579                    )
580                    .with_sparse_component_batches(
581                        row_id1,
582                        [(timeline1, 1000), (timeline2, 45)],
583                        [
584                            (MyPoints::descriptor_points(), Some(&points1 as _)),
585                            (MyPoints::descriptor_colors(), Some(&colors1 as _)),
586                        ],
587                    )
588                    .build()?;
589
590            eprintln!("expected:\n{chunk_sorted_timeline2}");
591
592            assert_eq!(
593                chunk_sorted_timeline2_expected,
594                chunk_sorted_timeline2,
595                "{}",
596                similar_asserts::SimpleDiff::from_str(
597                    &format!("{chunk_sorted_timeline2_expected}"),
598                    &format!("{chunk_sorted_timeline2}"),
599                    "got",
600                    "expected",
601                ),
602            );
603        }
604
605        Ok(())
606    }
607}