1use arrow::array::{Array as ArrowArray, ListArray as ArrowListArray};
2use arrow::buffer::{OffsetBuffer as ArrowOffsets, ScalarBuffer as ArrowScalarBuffer};
3use itertools::Itertools as _;
4use re_log_types::TimelineName;
5
6use crate::{Chunk, ChunkId, TimeColumn};
7
8impl Chunk {
11 #[inline]
17 pub fn is_sorted(&self) -> bool {
18 self.is_sorted
19 }
20
21 #[doc(hidden)]
23 #[inline]
24 pub fn is_sorted_uncached(&self) -> bool {
25 re_tracing::profile_function!();
26
27 self.row_ids()
28 .tuple_windows::<(_, _)>()
29 .all(|row_ids| row_ids.0 <= row_ids.1)
30 }
31
32 #[inline]
36 pub fn is_time_sorted(&self) -> bool {
37 self.timelines
38 .values()
39 .all(|time_column| time_column.is_sorted())
40 }
41
42 #[inline]
48 pub fn is_timeline_sorted(&self, timeline: &TimelineName) -> bool {
49 self.is_static()
50 || self
51 .timelines
52 .get(timeline)
53 .is_some_and(|time_column| time_column.is_sorted())
54 }
55
56 #[doc(hidden)]
58 #[inline]
59 pub fn is_timeline_sorted_uncached(&self, timeline: &TimelineName) -> bool {
60 self.is_static()
61 || self
62 .timelines
63 .get(timeline)
64 .is_some_and(|time_column| time_column.is_sorted_uncached())
65 }
66
67 #[inline]
73 pub fn sort_if_unsorted(&mut self) {
74 if self.is_sorted() {
75 return;
76 }
77
78 re_tracing::profile_function!();
79
80 self.id = ChunkId::new();
81
82 #[cfg(not(target_arch = "wasm32"))]
83 let now = std::time::Instant::now();
84
85 let swaps = {
86 re_tracing::profile_scope!("swaps");
87 let row_ids = self.row_ids_slice();
88 let mut swaps = (0..row_ids.len()).collect::<Vec<_>>();
89 swaps.sort_by_key(|&i| row_ids[i]);
90 swaps
91 };
92
93 self.shuffle_with(&swaps);
94
95 #[cfg(not(target_arch = "wasm32"))]
96 re_log::trace!(
97 entity_path = %self.entity_path,
98 num_rows = self.row_ids.len(),
99 elapsed = ?now.elapsed(),
100 "chunk sorted",
101 );
102
103 #[cfg(debug_assertions)]
104 #[expect(clippy::unwrap_used)] self.sanity_check().unwrap();
106 }
107
108 #[must_use]
117 pub fn sorted_by_timeline_if_unsorted(&self, timeline: &TimelineName) -> Self {
118 let Some(time_column) = self.timelines.get(timeline) else {
119 return self.clone_with_same_id();
120 };
121
122 if time_column.is_sorted() {
123 return self.clone_with_same_id();
124 }
125
126 let mut chunk = self.clone_with_new_id();
127
128 re_tracing::profile_function!();
129
130 #[cfg(not(target_arch = "wasm32"))]
131 let now = std::time::Instant::now();
132
133 let swaps = {
134 re_tracing::profile_scope!("swaps");
135 let row_ids = chunk.row_ids_slice();
136 let times = time_column.times_raw().to_vec();
137 let mut swaps = (0..times.len()).collect::<Vec<_>>();
138 swaps.sort_by_key(|&i| (times[i], row_ids[i]));
139 swaps
140 };
141
142 chunk.shuffle_with(&swaps);
143
144 #[cfg(not(target_arch = "wasm32"))]
145 re_log::trace!(
146 entity_path = %chunk.entity_path,
147 num_rows = chunk.row_ids.len(),
148 elapsed = ?now.elapsed(),
149 "chunk sorted",
150 );
151
152 #[cfg(debug_assertions)]
153 #[expect(clippy::unwrap_used)] chunk.sanity_check().unwrap();
155
156 chunk
157 }
158
159 #[inline]
163 #[cfg(debug_assertions)] pub fn shuffle_random(&mut self, seed: u64) {
165 re_tracing::profile_function!();
166
167 #[cfg(not(target_arch = "wasm32"))]
168 let now = std::time::Instant::now();
169
170 use rand::SeedableRng as _;
171 use rand::seq::SliceRandom as _;
172 let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
173
174 let swaps = {
175 re_tracing::profile_scope!("swaps");
176 let mut swaps = (0..self.row_ids.len()).collect::<Vec<_>>();
177 swaps.shuffle(&mut rng);
178 swaps
179 };
180
181 self.shuffle_with(&swaps);
182
183 #[cfg(not(target_arch = "wasm32"))]
184 re_log::trace!(
185 entity_path = %self.entity_path,
186 num_rows = self.row_ids.len(),
187 elapsed = ?now.elapsed(),
188 "chunk shuffled",
189 );
190 }
191
192 pub(crate) fn shuffle_with(&mut self, swaps: &[usize]) {
201 re_tracing::profile_function!();
202
203 self.id = ChunkId::new();
204
205 {
207 re_tracing::profile_scope!("row ids");
208
209 let row_ids = self.row_ids_slice();
210
211 let mut sorted_row_ids = row_ids.to_vec();
212 for (to, from) in swaps.iter().copied().enumerate() {
213 sorted_row_ids[to] = row_ids[from];
214 }
215 self.row_ids = re_types_core::RowId::arrow_from_slice(&sorted_row_ids);
216 }
217
218 let Self {
219 id: _,
220 entity_path: _,
221 heap_size_bytes: _,
222 is_sorted: _,
223 row_ids: _,
224 timelines,
225 components,
226 } = self;
227
228 {
230 re_tracing::profile_scope!("timelines");
231
232 for info in timelines.values_mut() {
233 let TimeColumn {
234 timeline: _,
235 times,
236 is_sorted,
237 time_range: _,
238 } = info;
239
240 let mut sorted = times.to_vec();
241 for (to, from) in swaps.iter().copied().enumerate() {
242 sorted[to] = times[from];
243 }
244
245 *is_sorted = sorted.windows(2).all(|times| times[0] <= times[1]);
246 *times = ArrowScalarBuffer::from(sorted);
247 }
248 }
249
250 re_tracing::profile_scope!("components (offsets & data)");
254 {
255 for original in components.list_arrays_mut() {
256 let sorted_arrays = swaps
257 .iter()
258 .copied()
259 .map(|from| original.value(from))
260 .collect_vec();
261 let sorted_arrays = sorted_arrays
262 .iter()
263 .map(|array| &**array as &dyn ArrowArray)
264 .collect_vec();
265
266 let datatype = original.data_type().clone();
267 let offsets =
268 ArrowOffsets::from_lengths(sorted_arrays.iter().map(|array| array.len()));
269 #[expect(clippy::unwrap_used)] let values = re_arrow_util::concat_arrays(&sorted_arrays).unwrap();
271 let validity = original
272 .nulls()
273 .map(|validity| swaps.iter().map(|&from| validity.is_valid(from)).collect());
274
275 let field = match datatype {
276 arrow::datatypes::DataType::List(field) => field.clone(),
277 _ => unreachable!("This is always s list array"),
278 };
279 *original = ArrowListArray::new(field, offsets, values, validity);
280 }
281 }
282
283 self.is_sorted = self.is_sorted_uncached();
284 }
285}
286
287impl TimeColumn {
288 #[inline]
294 pub fn is_sorted(&self) -> bool {
295 self.is_sorted
296 }
297
298 #[inline]
305 pub fn is_sorted_uncached(&self) -> bool {
306 re_tracing::profile_function!();
307 self.times_raw()
308 .windows(2)
309 .all(|times| times[0] <= times[1])
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use re_log_types::example_components::{MyColor, MyPoint, MyPoints};
316 use re_log_types::{EntityPath, Timeline};
317 use re_types_core::ComponentBatch as _;
318
319 use super::*;
320 use crate::{ChunkId, RowId};
321
322 #[test]
323 fn sort() -> anyhow::Result<()> {
324 let entity_path: EntityPath = "a/b/c".into();
325
326 let timeline1 = Timeline::new_duration("log_time");
327 let timeline2 = Timeline::new_sequence("frame_nr");
328
329 let points1 = vec![
330 MyPoint::new(1.0, 2.0),
331 MyPoint::new(3.0, 4.0),
332 MyPoint::new(5.0, 6.0),
333 ];
334 let points3 = vec![MyPoint::new(10.0, 20.0)];
335 let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
336
337 let colors1 = vec![
338 MyColor::from_rgb(1, 2, 3),
339 MyColor::from_rgb(4, 5, 6),
340 MyColor::from_rgb(7, 8, 9),
341 ];
342 let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
343 let colors4 = vec![
344 MyColor::from_rgb(101, 102, 103),
345 MyColor::from_rgb(104, 105, 106),
346 ];
347
348 {
349 let chunk_sorted = Chunk::builder(entity_path.clone())
350 .with_sparse_component_batches(
351 RowId::new(),
352 [(timeline1, 1000), (timeline2, 42)],
353 [
354 (MyPoints::descriptor_points(), Some(&points1 as _)),
355 (MyPoints::descriptor_colors(), Some(&colors1 as _)),
356 ],
357 )
358 .with_sparse_component_batches(
359 RowId::new(),
360 [(timeline1, 1001), (timeline2, 43)],
361 [
362 (MyPoints::descriptor_points(), None),
363 (MyPoints::descriptor_colors(), Some(&colors2 as _)),
364 ],
365 )
366 .with_sparse_component_batches(
367 RowId::new(),
368 [(timeline1, 1002), (timeline2, 44)],
369 [
370 (MyPoints::descriptor_points(), Some(&points3 as _)),
371 (MyPoints::descriptor_colors(), None),
372 ],
373 )
374 .with_sparse_component_batches(
375 RowId::new(),
376 [(timeline1, 1003), (timeline2, 45)],
377 [
378 (MyPoints::descriptor_points(), Some(&points4 as _)),
379 (MyPoints::descriptor_colors(), Some(&colors4 as _)),
380 ],
381 )
382 .build()?;
383
384 eprintln!("{chunk_sorted}");
385
386 assert!(chunk_sorted.is_sorted());
387 assert!(chunk_sorted.is_sorted_uncached());
388
389 let chunk_shuffled = {
390 let mut chunk_shuffled = chunk_sorted.clone();
391 chunk_shuffled.shuffle_random(666);
392 chunk_shuffled
393 };
394
395 eprintln!("{chunk_shuffled}");
396
397 assert!(!chunk_shuffled.is_sorted());
398 assert!(!chunk_shuffled.is_sorted_uncached());
399 assert_ne!(chunk_sorted, chunk_shuffled);
400
401 let chunk_resorted = {
402 let mut chunk_resorted = chunk_shuffled.clone();
403 chunk_resorted.sort_if_unsorted();
404 chunk_resorted
405 };
406
407 eprintln!("{chunk_resorted}");
408
409 assert!(chunk_resorted.is_sorted());
410 assert!(chunk_resorted.is_sorted_uncached());
411 assert_eq!(chunk_sorted, chunk_resorted);
412 }
413
414 Ok(())
415 }
416
417 #[test]
418 fn sort_time() -> anyhow::Result<()> {
419 let entity_path: EntityPath = "a/b/c".into();
420
421 let timeline1 = Timeline::new_duration("log_time");
422 let timeline2 = Timeline::new_sequence("frame_nr");
423
424 let chunk_id = ChunkId::new();
425 let row_id1 = RowId::new();
426 let row_id2 = RowId::new();
427 let row_id3 = RowId::new();
428 let row_id4 = RowId::new();
429
430 let points1 = vec![
431 MyPoint::new(1.0, 2.0),
432 MyPoint::new(3.0, 4.0),
433 MyPoint::new(5.0, 6.0),
434 ];
435 let points3 = vec![MyPoint::new(10.0, 20.0)];
436 let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
437
438 let colors1 = vec![
439 MyColor::from_rgb(1, 2, 3),
440 MyColor::from_rgb(4, 5, 6),
441 MyColor::from_rgb(7, 8, 9),
442 ];
443 let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
444 let colors4 = vec![
445 MyColor::from_rgb(101, 102, 103),
446 MyColor::from_rgb(104, 105, 106),
447 ];
448
449 {
450 let chunk_unsorted_timeline2 = Chunk::builder_with_id(chunk_id, entity_path.clone())
451 .with_sparse_component_batches(
452 row_id1,
453 [(timeline1, 1000), (timeline2, 45)],
454 [
455 (MyPoints::descriptor_points(), Some(&points1 as _)),
456 (MyPoints::descriptor_colors(), Some(&colors1 as _)),
457 ],
458 )
459 .with_sparse_component_batches(
460 row_id2,
461 [(timeline1, 1001), (timeline2, 44)],
462 [
463 (MyPoints::descriptor_points(), None),
464 (MyPoints::descriptor_colors(), Some(&colors2 as _)),
465 ],
466 )
467 .with_sparse_component_batches(
468 row_id3,
469 [(timeline1, 1002), (timeline2, 43)],
470 [
471 (MyPoints::descriptor_points(), Some(&points3 as _)),
472 (MyPoints::descriptor_colors(), None),
473 ],
474 )
475 .with_sparse_component_batches(
476 row_id4,
477 [(timeline1, 1003), (timeline2, 42)],
478 [
479 (MyPoints::descriptor_points(), Some(&points4 as _)),
480 (MyPoints::descriptor_colors(), Some(&colors4 as _)),
481 ],
482 )
483 .build()?;
484
485 eprintln!("unsorted:\n{chunk_unsorted_timeline2}");
486
487 assert!(chunk_unsorted_timeline2.is_sorted());
488 assert!(chunk_unsorted_timeline2.is_sorted_uncached());
489
490 assert!(
491 chunk_unsorted_timeline2
492 .timelines()
493 .get(timeline1.name())
494 .unwrap()
495 .is_sorted()
496 );
497 assert!(
498 chunk_unsorted_timeline2
499 .timelines()
500 .get(timeline1.name())
501 .unwrap()
502 .is_sorted_uncached()
503 );
504
505 assert!(
506 !chunk_unsorted_timeline2
507 .timelines()
508 .get(timeline2.name())
509 .unwrap()
510 .is_sorted()
511 );
512 assert!(
513 !chunk_unsorted_timeline2
514 .timelines()
515 .get(timeline2.name())
516 .unwrap()
517 .is_sorted_uncached()
518 );
519
520 let chunk_sorted_timeline2 =
521 chunk_unsorted_timeline2.sorted_by_timeline_if_unsorted(timeline2.name());
522
523 eprintln!("sorted:\n{chunk_sorted_timeline2}");
524
525 assert!(!chunk_sorted_timeline2.is_sorted());
526 assert!(!chunk_sorted_timeline2.is_sorted_uncached());
527
528 assert!(
529 !chunk_sorted_timeline2
530 .timelines()
531 .get(timeline1.name())
532 .unwrap()
533 .is_sorted()
534 );
535 assert!(
536 !chunk_sorted_timeline2
537 .timelines()
538 .get(timeline1.name())
539 .unwrap()
540 .is_sorted_uncached()
541 );
542
543 assert!(
544 chunk_sorted_timeline2
545 .timelines()
546 .get(timeline2.name())
547 .unwrap()
548 .is_sorted()
549 );
550 assert!(
551 chunk_sorted_timeline2
552 .timelines()
553 .get(timeline2.name())
554 .unwrap()
555 .is_sorted_uncached()
556 );
557
558 let chunk_sorted_timeline2_expected =
559 Chunk::builder_with_id(chunk_id, entity_path.clone())
560 .with_sparse_component_batches(
561 row_id4,
562 [(timeline1, 1003), (timeline2, 42)],
563 [
564 (MyPoints::descriptor_points(), Some(&points4 as _)),
565 (MyPoints::descriptor_colors(), Some(&colors4 as _)),
566 ],
567 )
568 .with_sparse_component_batches(
569 row_id3,
570 [(timeline1, 1002), (timeline2, 43)],
571 [
572 (MyPoints::descriptor_points(), Some(&points3 as _)),
573 (MyPoints::descriptor_colors(), None),
574 ],
575 )
576 .with_sparse_component_batches(
577 row_id2,
578 [(timeline1, 1001), (timeline2, 44)],
579 [
580 (MyPoints::descriptor_points(), None),
581 (MyPoints::descriptor_colors(), Some(&colors2 as _)),
582 ],
583 )
584 .with_sparse_component_batches(
585 row_id1,
586 [(timeline1, 1000), (timeline2, 45)],
587 [
588 (MyPoints::descriptor_points(), Some(&points1 as _)),
589 (MyPoints::descriptor_colors(), Some(&colors1 as _)),
590 ],
591 )
592 .build()?;
593
594 eprintln!("expected:\n{chunk_sorted_timeline2}");
595
596 assert_eq!(
597 chunk_sorted_timeline2_expected,
598 chunk_sorted_timeline2,
599 "{}",
600 similar_asserts::SimpleDiff::from_str(
601 &format!("{chunk_sorted_timeline2_expected}"),
602 &format!("{chunk_sorted_timeline2}"),
603 "got",
604 "expected",
605 ),
606 );
607 }
608
609 Ok(())
610 }
611
612 #[test]
615 fn from_auto_row_ids_sorts_lexicographically() -> anyhow::Result<()> {
616 let entity_path: EntityPath = "a/b/c".into();
617
618 let alpha = TimeColumn::new_sequence("alpha", [2_i64, 1, 1, 2, 1]);
631 let beta = TimeColumn::new_sequence("beta", [5_i64, 9, 7, 3, 9]);
632
633 let colors = vec![
634 MyColor(100),
635 MyColor(200),
636 MyColor(300),
637 MyColor(400),
638 MyColor(500),
639 ];
640 let colors_array = colors.to_arrow_list_array()?;
641
642 let chunk = Chunk::from_columns(
644 entity_path,
645 [alpha, beta],
646 [(MyPoints::descriptor_colors(), colors_array)],
647 )?;
648
649 eprintln!("{chunk}");
650
651 assert!(chunk.is_sorted());
652 assert!(chunk.is_sorted_uncached());
653
654 let alpha = chunk.timelines().get(&"alpha".into()).unwrap();
655 let beta = chunk.timelines().get(&"beta".into()).unwrap();
656
657 assert!(alpha.is_sorted());
660 assert!(!beta.is_sorted());
661
662 assert_eq!(alpha.times_raw().to_vec(), vec![1, 1, 1, 2, 2]);
663 assert_eq!(beta.times_raw().to_vec(), vec![7, 9, 9, 3, 5]);
664
665 let got_colors: Vec<u32> = chunk
668 .iter_slices::<u32>(MyPoints::descriptor_colors().component)
669 .flat_map(<[u32]>::to_vec)
670 .collect();
671 assert_eq!(got_colors, vec![300, 200, 500, 400, 100]);
672
673 let row_ids: Vec<_> = chunk.row_ids().collect();
675 for w in row_ids.windows(2) {
676 assert!(w[0] < w[1], "row_ids must be strictly ascending");
677 }
678
679 Ok(())
680 }
681}