1use arrow::array::{Array as ArrowArray, ListArray as ArrowListArray};
2use arrow::buffer::{OffsetBuffer as ArrowOffsets, ScalarBuffer as ArrowScalarBuffer};
3use itertools::Itertools as _;
4use re_log_types::TimelineName;
5
6use crate::{Chunk, TimeColumn};
7
8impl Chunk {
11 #[inline]
17 pub fn is_sorted(&self) -> bool {
18 self.is_sorted
19 }
20
21 #[doc(hidden)]
23 #[inline]
24 pub fn is_sorted_uncached(&self) -> bool {
25 re_tracing::profile_function!();
26
27 self.row_ids()
28 .tuple_windows::<(_, _)>()
29 .all(|row_ids| row_ids.0 <= row_ids.1)
30 }
31
32 #[inline]
36 pub fn is_time_sorted(&self) -> bool {
37 self.timelines
38 .values()
39 .all(|time_column| time_column.is_sorted())
40 }
41
42 #[inline]
48 pub fn is_timeline_sorted(&self, timeline: &TimelineName) -> bool {
49 self.is_static()
50 || self
51 .timelines
52 .get(timeline)
53 .is_some_and(|time_column| time_column.is_sorted())
54 }
55
56 #[doc(hidden)]
58 #[inline]
59 pub fn is_timeline_sorted_uncached(&self, timeline: &TimelineName) -> bool {
60 self.is_static()
61 || self
62 .timelines
63 .get(timeline)
64 .is_some_and(|time_column| time_column.is_sorted_uncached())
65 }
66
67 #[inline]
71 pub fn sort_if_unsorted(&mut self) {
72 if self.is_sorted() {
73 return;
74 }
75
76 re_tracing::profile_function!();
77
78 #[cfg(not(target_arch = "wasm32"))]
79 let now = std::time::Instant::now();
80
81 let swaps = {
82 re_tracing::profile_scope!("swaps");
83 let row_ids = self.row_ids_slice();
84 let mut swaps = (0..row_ids.len()).collect::<Vec<_>>();
85 swaps.sort_by_key(|&i| row_ids[i]);
86 swaps
87 };
88
89 self.shuffle_with(&swaps);
90
91 #[cfg(not(target_arch = "wasm32"))]
92 re_log::trace!(
93 entity_path = %self.entity_path,
94 num_rows = self.row_ids.len(),
95 elapsed = ?now.elapsed(),
96 "chunk sorted",
97 );
98
99 #[cfg(debug_assertions)]
100 #[expect(clippy::unwrap_used)] self.sanity_check().unwrap();
102 }
103
104 #[must_use]
112 pub fn sorted_by_timeline_if_unsorted(&self, timeline: &TimelineName) -> Self {
113 let mut chunk = self.clone();
114
115 let Some(time_column) = chunk.timelines.get(timeline) else {
116 return chunk;
117 };
118
119 if time_column.is_sorted() {
120 return chunk;
121 }
122
123 re_tracing::profile_function!();
124
125 #[cfg(not(target_arch = "wasm32"))]
126 let now = std::time::Instant::now();
127
128 let swaps = {
129 re_tracing::profile_scope!("swaps");
130 let row_ids = chunk.row_ids_slice();
131 let times = time_column.times_raw().to_vec();
132 let mut swaps = (0..times.len()).collect::<Vec<_>>();
133 swaps.sort_by_key(|&i| (times[i], row_ids[i]));
134 swaps
135 };
136
137 chunk.shuffle_with(&swaps);
138
139 #[cfg(not(target_arch = "wasm32"))]
140 re_log::trace!(
141 entity_path = %chunk.entity_path,
142 num_rows = chunk.row_ids.len(),
143 elapsed = ?now.elapsed(),
144 "chunk sorted",
145 );
146
147 #[cfg(debug_assertions)]
148 #[expect(clippy::unwrap_used)] chunk.sanity_check().unwrap();
150
151 chunk
152 }
153
154 #[inline]
158 pub fn shuffle_random(&mut self, seed: u64) {
159 re_tracing::profile_function!();
160
161 #[cfg(not(target_arch = "wasm32"))]
162 let now = std::time::Instant::now();
163
164 use rand::SeedableRng as _;
165 use rand::seq::SliceRandom as _;
166 let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
167
168 let swaps = {
169 re_tracing::profile_scope!("swaps");
170 let mut swaps = (0..self.row_ids.len()).collect::<Vec<_>>();
171 swaps.shuffle(&mut rng);
172 swaps
173 };
174
175 self.shuffle_with(&swaps);
176
177 #[cfg(not(target_arch = "wasm32"))]
178 re_log::trace!(
179 entity_path = %self.entity_path,
180 num_rows = self.row_ids.len(),
181 elapsed = ?now.elapsed(),
182 "chunk shuffled",
183 );
184 }
185
186 pub(crate) fn shuffle_with(&mut self, swaps: &[usize]) {
195 re_tracing::profile_function!();
196
197 {
199 re_tracing::profile_scope!("row ids");
200
201 let row_ids = self.row_ids_slice();
202
203 let mut sorted_row_ids = row_ids.to_vec();
204 for (to, from) in swaps.iter().copied().enumerate() {
205 sorted_row_ids[to] = row_ids[from];
206 }
207 self.row_ids = re_types_core::RowId::arrow_from_slice(&sorted_row_ids);
208 }
209
210 let Self {
211 id: _,
212 entity_path: _,
213 heap_size_bytes: _,
214 is_sorted: _,
215 row_ids: _,
216 timelines,
217 components,
218 } = self;
219
220 {
222 re_tracing::profile_scope!("timelines");
223
224 for info in timelines.values_mut() {
225 let TimeColumn {
226 timeline: _,
227 times,
228 is_sorted,
229 time_range: _,
230 } = info;
231
232 let mut sorted = times.to_vec();
233 for (to, from) in swaps.iter().copied().enumerate() {
234 sorted[to] = times[from];
235 }
236
237 *is_sorted = sorted.windows(2).all(|times| times[0] <= times[1]);
238 *times = ArrowScalarBuffer::from(sorted);
239 }
240 }
241
242 re_tracing::profile_scope!("components (offsets & data)");
246 {
247 for original in components.list_arrays_mut() {
248 let sorted_arrays = swaps
249 .iter()
250 .copied()
251 .map(|from| original.value(from))
252 .collect_vec();
253 let sorted_arrays = sorted_arrays
254 .iter()
255 .map(|array| &**array as &dyn ArrowArray)
256 .collect_vec();
257
258 let datatype = original.data_type().clone();
259 let offsets =
260 ArrowOffsets::from_lengths(sorted_arrays.iter().map(|array| array.len()));
261 #[expect(clippy::unwrap_used)] let values = re_arrow_util::concat_arrays(&sorted_arrays).unwrap();
263 let validity = original
264 .nulls()
265 .map(|validity| swaps.iter().map(|&from| validity.is_valid(from)).collect());
266
267 let field = match datatype {
268 arrow::datatypes::DataType::List(field) => field.clone(),
269 _ => unreachable!("This is always s list array"),
270 };
271 *original = ArrowListArray::new(field, offsets, values, validity);
272 }
273 }
274
275 self.is_sorted = self.is_sorted_uncached();
276 }
277}
278
279impl TimeColumn {
280 #[inline]
286 pub fn is_sorted(&self) -> bool {
287 self.is_sorted
288 }
289
290 #[inline]
297 pub fn is_sorted_uncached(&self) -> bool {
298 re_tracing::profile_function!();
299 self.times_raw()
300 .windows(2)
301 .all(|times| times[0] <= times[1])
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use re_log_types::example_components::{MyColor, MyPoint, MyPoints};
308 use re_log_types::{EntityPath, Timeline};
309
310 use super::*;
311 use crate::{ChunkId, RowId};
312
313 #[test]
314 fn sort() -> anyhow::Result<()> {
315 let entity_path: EntityPath = "a/b/c".into();
316
317 let timeline1 = Timeline::new_duration("log_time");
318 let timeline2 = Timeline::new_sequence("frame_nr");
319
320 let points1 = vec![
321 MyPoint::new(1.0, 2.0),
322 MyPoint::new(3.0, 4.0),
323 MyPoint::new(5.0, 6.0),
324 ];
325 let points3 = vec![MyPoint::new(10.0, 20.0)];
326 let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
327
328 let colors1 = vec![
329 MyColor::from_rgb(1, 2, 3),
330 MyColor::from_rgb(4, 5, 6),
331 MyColor::from_rgb(7, 8, 9),
332 ];
333 let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
334 let colors4 = vec![
335 MyColor::from_rgb(101, 102, 103),
336 MyColor::from_rgb(104, 105, 106),
337 ];
338
339 {
340 let chunk_sorted = Chunk::builder(entity_path.clone())
341 .with_sparse_component_batches(
342 RowId::new(),
343 [(timeline1, 1000), (timeline2, 42)],
344 [
345 (MyPoints::descriptor_points(), Some(&points1 as _)),
346 (MyPoints::descriptor_colors(), Some(&colors1 as _)),
347 ],
348 )
349 .with_sparse_component_batches(
350 RowId::new(),
351 [(timeline1, 1001), (timeline2, 43)],
352 [
353 (MyPoints::descriptor_points(), None),
354 (MyPoints::descriptor_colors(), Some(&colors2 as _)),
355 ],
356 )
357 .with_sparse_component_batches(
358 RowId::new(),
359 [(timeline1, 1002), (timeline2, 44)],
360 [
361 (MyPoints::descriptor_points(), Some(&points3 as _)),
362 (MyPoints::descriptor_colors(), None),
363 ],
364 )
365 .with_sparse_component_batches(
366 RowId::new(),
367 [(timeline1, 1003), (timeline2, 45)],
368 [
369 (MyPoints::descriptor_points(), Some(&points4 as _)),
370 (MyPoints::descriptor_colors(), Some(&colors4 as _)),
371 ],
372 )
373 .build()?;
374
375 eprintln!("{chunk_sorted}");
376
377 assert!(chunk_sorted.is_sorted());
378 assert!(chunk_sorted.is_sorted_uncached());
379
380 let chunk_shuffled = {
381 let mut chunk_shuffled = chunk_sorted.clone();
382 chunk_shuffled.shuffle_random(666);
383 chunk_shuffled
384 };
385
386 eprintln!("{chunk_shuffled}");
387
388 assert!(!chunk_shuffled.is_sorted());
389 assert!(!chunk_shuffled.is_sorted_uncached());
390 assert_ne!(chunk_sorted, chunk_shuffled);
391
392 let chunk_resorted = {
393 let mut chunk_resorted = chunk_shuffled.clone();
394 chunk_resorted.sort_if_unsorted();
395 chunk_resorted
396 };
397
398 eprintln!("{chunk_resorted}");
399
400 assert!(chunk_resorted.is_sorted());
401 assert!(chunk_resorted.is_sorted_uncached());
402 assert_eq!(chunk_sorted, chunk_resorted);
403 }
404
405 Ok(())
406 }
407
408 #[test]
409 fn sort_time() -> anyhow::Result<()> {
410 let entity_path: EntityPath = "a/b/c".into();
411
412 let timeline1 = Timeline::new_duration("log_time");
413 let timeline2 = Timeline::new_sequence("frame_nr");
414
415 let chunk_id = ChunkId::new();
416 let row_id1 = RowId::new();
417 let row_id2 = RowId::new();
418 let row_id3 = RowId::new();
419 let row_id4 = RowId::new();
420
421 let points1 = vec![
422 MyPoint::new(1.0, 2.0),
423 MyPoint::new(3.0, 4.0),
424 MyPoint::new(5.0, 6.0),
425 ];
426 let points3 = vec![MyPoint::new(10.0, 20.0)];
427 let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
428
429 let colors1 = vec![
430 MyColor::from_rgb(1, 2, 3),
431 MyColor::from_rgb(4, 5, 6),
432 MyColor::from_rgb(7, 8, 9),
433 ];
434 let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
435 let colors4 = vec![
436 MyColor::from_rgb(101, 102, 103),
437 MyColor::from_rgb(104, 105, 106),
438 ];
439
440 {
441 let chunk_unsorted_timeline2 = Chunk::builder_with_id(chunk_id, entity_path.clone())
442 .with_sparse_component_batches(
443 row_id1,
444 [(timeline1, 1000), (timeline2, 45)],
445 [
446 (MyPoints::descriptor_points(), Some(&points1 as _)),
447 (MyPoints::descriptor_colors(), Some(&colors1 as _)),
448 ],
449 )
450 .with_sparse_component_batches(
451 row_id2,
452 [(timeline1, 1001), (timeline2, 44)],
453 [
454 (MyPoints::descriptor_points(), None),
455 (MyPoints::descriptor_colors(), Some(&colors2 as _)),
456 ],
457 )
458 .with_sparse_component_batches(
459 row_id3,
460 [(timeline1, 1002), (timeline2, 43)],
461 [
462 (MyPoints::descriptor_points(), Some(&points3 as _)),
463 (MyPoints::descriptor_colors(), None),
464 ],
465 )
466 .with_sparse_component_batches(
467 row_id4,
468 [(timeline1, 1003), (timeline2, 42)],
469 [
470 (MyPoints::descriptor_points(), Some(&points4 as _)),
471 (MyPoints::descriptor_colors(), Some(&colors4 as _)),
472 ],
473 )
474 .build()?;
475
476 eprintln!("unsorted:\n{chunk_unsorted_timeline2}");
477
478 assert!(chunk_unsorted_timeline2.is_sorted());
479 assert!(chunk_unsorted_timeline2.is_sorted_uncached());
480
481 assert!(
482 chunk_unsorted_timeline2
483 .timelines()
484 .get(timeline1.name())
485 .unwrap()
486 .is_sorted()
487 );
488 assert!(
489 chunk_unsorted_timeline2
490 .timelines()
491 .get(timeline1.name())
492 .unwrap()
493 .is_sorted_uncached()
494 );
495
496 assert!(
497 !chunk_unsorted_timeline2
498 .timelines()
499 .get(timeline2.name())
500 .unwrap()
501 .is_sorted()
502 );
503 assert!(
504 !chunk_unsorted_timeline2
505 .timelines()
506 .get(timeline2.name())
507 .unwrap()
508 .is_sorted_uncached()
509 );
510
511 let chunk_sorted_timeline2 =
512 chunk_unsorted_timeline2.sorted_by_timeline_if_unsorted(timeline2.name());
513
514 eprintln!("sorted:\n{chunk_sorted_timeline2}");
515
516 assert!(!chunk_sorted_timeline2.is_sorted());
517 assert!(!chunk_sorted_timeline2.is_sorted_uncached());
518
519 assert!(
520 !chunk_sorted_timeline2
521 .timelines()
522 .get(timeline1.name())
523 .unwrap()
524 .is_sorted()
525 );
526 assert!(
527 !chunk_sorted_timeline2
528 .timelines()
529 .get(timeline1.name())
530 .unwrap()
531 .is_sorted_uncached()
532 );
533
534 assert!(
535 chunk_sorted_timeline2
536 .timelines()
537 .get(timeline2.name())
538 .unwrap()
539 .is_sorted()
540 );
541 assert!(
542 chunk_sorted_timeline2
543 .timelines()
544 .get(timeline2.name())
545 .unwrap()
546 .is_sorted_uncached()
547 );
548
549 let chunk_sorted_timeline2_expected =
550 Chunk::builder_with_id(chunk_id, entity_path.clone())
551 .with_sparse_component_batches(
552 row_id4,
553 [(timeline1, 1003), (timeline2, 42)],
554 [
555 (MyPoints::descriptor_points(), Some(&points4 as _)),
556 (MyPoints::descriptor_colors(), Some(&colors4 as _)),
557 ],
558 )
559 .with_sparse_component_batches(
560 row_id3,
561 [(timeline1, 1002), (timeline2, 43)],
562 [
563 (MyPoints::descriptor_points(), Some(&points3 as _)),
564 (MyPoints::descriptor_colors(), None),
565 ],
566 )
567 .with_sparse_component_batches(
568 row_id2,
569 [(timeline1, 1001), (timeline2, 44)],
570 [
571 (MyPoints::descriptor_points(), None),
572 (MyPoints::descriptor_colors(), Some(&colors2 as _)),
573 ],
574 )
575 .with_sparse_component_batches(
576 row_id1,
577 [(timeline1, 1000), (timeline2, 45)],
578 [
579 (MyPoints::descriptor_points(), Some(&points1 as _)),
580 (MyPoints::descriptor_colors(), Some(&colors1 as _)),
581 ],
582 )
583 .build()?;
584
585 eprintln!("expected:\n{chunk_sorted_timeline2}");
586
587 assert_eq!(
588 chunk_sorted_timeline2_expected,
589 chunk_sorted_timeline2,
590 "{}",
591 similar_asserts::SimpleDiff::from_str(
592 &format!("{chunk_sorted_timeline2_expected}"),
593 &format!("{chunk_sorted_timeline2}"),
594 "got",
595 "expected",
596 ),
597 );
598 }
599
600 Ok(())
601 }
602}