1use arrow::{
2 array::{Array as ArrowArray, ListArray as ArrowListArray},
3 buffer::{OffsetBuffer as ArrowOffsets, ScalarBuffer as ArrowScalarBuffer},
4};
5use itertools::Itertools as _;
6
7use re_log_types::TimelineName;
8
9use crate::{Chunk, TimeColumn};
10
11impl Chunk {
14 #[inline]
20 pub fn is_sorted(&self) -> bool {
21 self.is_sorted
22 }
23
24 #[doc(hidden)]
26 #[inline]
27 pub fn is_sorted_uncached(&self) -> bool {
28 re_tracing::profile_function!();
29
30 self.row_ids()
31 .tuple_windows::<(_, _)>()
32 .all(|row_ids| row_ids.0 <= row_ids.1)
33 }
34
35 #[inline]
39 pub fn is_time_sorted(&self) -> bool {
40 self.timelines
41 .values()
42 .all(|time_column| time_column.is_sorted())
43 }
44
45 #[inline]
51 pub fn is_timeline_sorted(&self, timeline: &TimelineName) -> bool {
52 self.is_static()
53 || self
54 .timelines
55 .get(timeline)
56 .is_some_and(|time_column| time_column.is_sorted())
57 }
58
59 #[doc(hidden)]
61 #[inline]
62 pub fn is_timeline_sorted_uncached(&self, timeline: &TimelineName) -> bool {
63 self.is_static()
64 || self
65 .timelines
66 .get(timeline)
67 .is_some_and(|time_column| time_column.is_sorted_uncached())
68 }
69
70 #[inline]
74 pub fn sort_if_unsorted(&mut self) {
75 if self.is_sorted() {
76 return;
77 }
78
79 re_tracing::profile_function!();
80
81 #[cfg(not(target_arch = "wasm32"))]
82 let now = std::time::Instant::now();
83
84 let swaps = {
85 re_tracing::profile_scope!("swaps");
86 let row_ids = self.row_ids_slice();
87 let mut swaps = (0..row_ids.len()).collect::<Vec<_>>();
88 swaps.sort_by_key(|&i| row_ids[i]);
89 swaps
90 };
91
92 self.shuffle_with(&swaps);
93
94 #[cfg(not(target_arch = "wasm32"))]
95 re_log::trace!(
96 entity_path = %self.entity_path,
97 num_rows = self.row_ids.len(),
98 elapsed = ?now.elapsed(),
99 "chunk sorted",
100 );
101
102 #[cfg(debug_assertions)]
103 #[expect(clippy::unwrap_used)] self.sanity_check().unwrap();
105 }
106
107 #[must_use]
115 pub fn sorted_by_timeline_if_unsorted(&self, timeline: &TimelineName) -> Self {
116 let mut chunk = self.clone();
117
118 let Some(time_column) = chunk.timelines.get(timeline) else {
119 return chunk;
120 };
121
122 if time_column.is_sorted() {
123 return chunk;
124 }
125
126 re_tracing::profile_function!();
127
128 #[cfg(not(target_arch = "wasm32"))]
129 let now = std::time::Instant::now();
130
131 let swaps = {
132 re_tracing::profile_scope!("swaps");
133 let row_ids = chunk.row_ids_slice();
134 let times = time_column.times_raw().to_vec();
135 let mut swaps = (0..times.len()).collect::<Vec<_>>();
136 swaps.sort_by_key(|&i| (times[i], row_ids[i]));
137 swaps
138 };
139
140 chunk.shuffle_with(&swaps);
141
142 #[cfg(not(target_arch = "wasm32"))]
143 re_log::trace!(
144 entity_path = %chunk.entity_path,
145 num_rows = chunk.row_ids.len(),
146 elapsed = ?now.elapsed(),
147 "chunk sorted",
148 );
149
150 #[cfg(debug_assertions)]
151 #[expect(clippy::unwrap_used)] chunk.sanity_check().unwrap();
153
154 chunk
155 }
156
157 #[inline]
161 pub fn shuffle_random(&mut self, seed: u64) {
162 re_tracing::profile_function!();
163
164 #[cfg(not(target_arch = "wasm32"))]
165 let now = std::time::Instant::now();
166
167 use rand::{SeedableRng as _, seq::SliceRandom as _};
168 let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
169
170 let swaps = {
171 re_tracing::profile_scope!("swaps");
172 let mut swaps = (0..self.row_ids.len()).collect::<Vec<_>>();
173 swaps.shuffle(&mut rng);
174 swaps
175 };
176
177 self.shuffle_with(&swaps);
178
179 #[cfg(not(target_arch = "wasm32"))]
180 re_log::trace!(
181 entity_path = %self.entity_path,
182 num_rows = self.row_ids.len(),
183 elapsed = ?now.elapsed(),
184 "chunk shuffled",
185 );
186 }
187
188 pub(crate) fn shuffle_with(&mut self, swaps: &[usize]) {
197 re_tracing::profile_function!();
198
199 {
201 re_tracing::profile_scope!("row ids");
202
203 let row_ids = self.row_ids_slice();
204
205 let mut sorted_row_ids = row_ids.to_vec();
206 for (to, from) in swaps.iter().copied().enumerate() {
207 sorted_row_ids[to] = row_ids[from];
208 }
209 self.row_ids = re_types_core::RowId::arrow_from_slice(&sorted_row_ids);
210 }
211
212 let Self {
213 id: _,
214 entity_path: _,
215 heap_size_bytes: _,
216 is_sorted: _,
217 row_ids: _,
218 timelines,
219 components,
220 } = self;
221
222 {
224 re_tracing::profile_scope!("timelines");
225
226 for info in timelines.values_mut() {
227 let TimeColumn {
228 timeline: _,
229 times,
230 is_sorted,
231 time_range: _,
232 } = info;
233
234 let mut sorted = times.to_vec();
235 for (to, from) in swaps.iter().copied().enumerate() {
236 sorted[to] = times[from];
237 }
238
239 *is_sorted = sorted.windows(2).all(|times| times[0] <= times[1]);
240 *times = ArrowScalarBuffer::from(sorted);
241 }
242 }
243
244 re_tracing::profile_scope!("components (offsets & data)");
248 {
249 for original in components.values_mut() {
250 let sorted_arrays = swaps
251 .iter()
252 .copied()
253 .map(|from| original.value(from))
254 .collect_vec();
255 let sorted_arrays = sorted_arrays
256 .iter()
257 .map(|array| &**array as &dyn ArrowArray)
258 .collect_vec();
259
260 let datatype = original.data_type().clone();
261 let offsets =
262 ArrowOffsets::from_lengths(sorted_arrays.iter().map(|array| array.len()));
263 #[expect(clippy::unwrap_used)] let values = re_arrow_util::concat_arrays(&sorted_arrays).unwrap();
265 let validity = original
266 .nulls()
267 .map(|validity| swaps.iter().map(|&from| validity.is_valid(from)).collect());
268
269 let field = match datatype {
270 arrow::datatypes::DataType::List(field) => field.clone(),
271 _ => unreachable!("This is always s list array"),
272 };
273 *original = ArrowListArray::new(field, offsets, values, validity);
274 }
275 }
276
277 self.is_sorted = self.is_sorted_uncached();
278 }
279}
280
281impl TimeColumn {
282 #[inline]
288 pub fn is_sorted(&self) -> bool {
289 self.is_sorted
290 }
291
292 #[inline]
299 pub fn is_sorted_uncached(&self) -> bool {
300 re_tracing::profile_function!();
301 self.times_raw()
302 .windows(2)
303 .all(|times| times[0] <= times[1])
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use re_log_types::{
310 EntityPath, Timeline,
311 example_components::{MyColor, MyPoint, MyPoints},
312 };
313
314 use crate::{ChunkId, RowId};
315
316 use super::*;
317
318 #[test]
319 fn sort() -> anyhow::Result<()> {
320 let entity_path: EntityPath = "a/b/c".into();
321
322 let timeline1 = Timeline::new_duration("log_time");
323 let timeline2 = Timeline::new_sequence("frame_nr");
324
325 let points1 = vec![
326 MyPoint::new(1.0, 2.0),
327 MyPoint::new(3.0, 4.0),
328 MyPoint::new(5.0, 6.0),
329 ];
330 let points3 = vec![MyPoint::new(10.0, 20.0)];
331 let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
332
333 let colors1 = vec![
334 MyColor::from_rgb(1, 2, 3),
335 MyColor::from_rgb(4, 5, 6),
336 MyColor::from_rgb(7, 8, 9),
337 ];
338 let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
339 let colors4 = vec![
340 MyColor::from_rgb(101, 102, 103),
341 MyColor::from_rgb(104, 105, 106),
342 ];
343
344 {
345 let chunk_sorted = Chunk::builder(entity_path.clone())
346 .with_sparse_component_batches(
347 RowId::new(),
348 [(timeline1, 1000), (timeline2, 42)],
349 [
350 (MyPoints::descriptor_points(), Some(&points1 as _)),
351 (MyPoints::descriptor_colors(), Some(&colors1 as _)),
352 ],
353 )
354 .with_sparse_component_batches(
355 RowId::new(),
356 [(timeline1, 1001), (timeline2, 43)],
357 [
358 (MyPoints::descriptor_points(), None),
359 (MyPoints::descriptor_colors(), Some(&colors2 as _)),
360 ],
361 )
362 .with_sparse_component_batches(
363 RowId::new(),
364 [(timeline1, 1002), (timeline2, 44)],
365 [
366 (MyPoints::descriptor_points(), Some(&points3 as _)),
367 (MyPoints::descriptor_colors(), None),
368 ],
369 )
370 .with_sparse_component_batches(
371 RowId::new(),
372 [(timeline1, 1003), (timeline2, 45)],
373 [
374 (MyPoints::descriptor_points(), Some(&points4 as _)),
375 (MyPoints::descriptor_colors(), Some(&colors4 as _)),
376 ],
377 )
378 .build()?;
379
380 eprintln!("{chunk_sorted}");
381
382 assert!(chunk_sorted.is_sorted());
383 assert!(chunk_sorted.is_sorted_uncached());
384
385 let chunk_shuffled = {
386 let mut chunk_shuffled = chunk_sorted.clone();
387 chunk_shuffled.shuffle_random(666);
388 chunk_shuffled
389 };
390
391 eprintln!("{chunk_shuffled}");
392
393 assert!(!chunk_shuffled.is_sorted());
394 assert!(!chunk_shuffled.is_sorted_uncached());
395 assert_ne!(chunk_sorted, chunk_shuffled);
396
397 let chunk_resorted = {
398 let mut chunk_resorted = chunk_shuffled.clone();
399 chunk_resorted.sort_if_unsorted();
400 chunk_resorted
401 };
402
403 eprintln!("{chunk_resorted}");
404
405 assert!(chunk_resorted.is_sorted());
406 assert!(chunk_resorted.is_sorted_uncached());
407 assert_eq!(chunk_sorted, chunk_resorted);
408 }
409
410 Ok(())
411 }
412
413 #[test]
414 fn sort_time() -> anyhow::Result<()> {
415 let entity_path: EntityPath = "a/b/c".into();
416
417 let timeline1 = Timeline::new_duration("log_time");
418 let timeline2 = Timeline::new_sequence("frame_nr");
419
420 let chunk_id = ChunkId::new();
421 let row_id1 = RowId::new();
422 let row_id2 = RowId::new();
423 let row_id3 = RowId::new();
424 let row_id4 = RowId::new();
425
426 let points1 = vec![
427 MyPoint::new(1.0, 2.0),
428 MyPoint::new(3.0, 4.0),
429 MyPoint::new(5.0, 6.0),
430 ];
431 let points3 = vec![MyPoint::new(10.0, 20.0)];
432 let points4 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)];
433
434 let colors1 = vec![
435 MyColor::from_rgb(1, 2, 3),
436 MyColor::from_rgb(4, 5, 6),
437 MyColor::from_rgb(7, 8, 9),
438 ];
439 let colors2 = vec![MyColor::from_rgb(10, 20, 30)];
440 let colors4 = vec![
441 MyColor::from_rgb(101, 102, 103),
442 MyColor::from_rgb(104, 105, 106),
443 ];
444
445 {
446 let chunk_unsorted_timeline2 = Chunk::builder_with_id(chunk_id, entity_path.clone())
447 .with_sparse_component_batches(
448 row_id1,
449 [(timeline1, 1000), (timeline2, 45)],
450 [
451 (MyPoints::descriptor_points(), Some(&points1 as _)),
452 (MyPoints::descriptor_colors(), Some(&colors1 as _)),
453 ],
454 )
455 .with_sparse_component_batches(
456 row_id2,
457 [(timeline1, 1001), (timeline2, 44)],
458 [
459 (MyPoints::descriptor_points(), None),
460 (MyPoints::descriptor_colors(), Some(&colors2 as _)),
461 ],
462 )
463 .with_sparse_component_batches(
464 row_id3,
465 [(timeline1, 1002), (timeline2, 43)],
466 [
467 (MyPoints::descriptor_points(), Some(&points3 as _)),
468 (MyPoints::descriptor_colors(), None),
469 ],
470 )
471 .with_sparse_component_batches(
472 row_id4,
473 [(timeline1, 1003), (timeline2, 42)],
474 [
475 (MyPoints::descriptor_points(), Some(&points4 as _)),
476 (MyPoints::descriptor_colors(), Some(&colors4 as _)),
477 ],
478 )
479 .build()?;
480
481 eprintln!("unsorted:\n{chunk_unsorted_timeline2}");
482
483 assert!(chunk_unsorted_timeline2.is_sorted());
484 assert!(chunk_unsorted_timeline2.is_sorted_uncached());
485
486 assert!(
487 chunk_unsorted_timeline2
488 .timelines()
489 .get(timeline1.name())
490 .unwrap()
491 .is_sorted()
492 );
493 assert!(
494 chunk_unsorted_timeline2
495 .timelines()
496 .get(timeline1.name())
497 .unwrap()
498 .is_sorted_uncached()
499 );
500
501 assert!(
502 !chunk_unsorted_timeline2
503 .timelines()
504 .get(timeline2.name())
505 .unwrap()
506 .is_sorted()
507 );
508 assert!(
509 !chunk_unsorted_timeline2
510 .timelines()
511 .get(timeline2.name())
512 .unwrap()
513 .is_sorted_uncached()
514 );
515
516 let chunk_sorted_timeline2 =
517 chunk_unsorted_timeline2.sorted_by_timeline_if_unsorted(timeline2.name());
518
519 eprintln!("sorted:\n{chunk_sorted_timeline2}");
520
521 assert!(!chunk_sorted_timeline2.is_sorted());
522 assert!(!chunk_sorted_timeline2.is_sorted_uncached());
523
524 assert!(
525 !chunk_sorted_timeline2
526 .timelines()
527 .get(timeline1.name())
528 .unwrap()
529 .is_sorted()
530 );
531 assert!(
532 !chunk_sorted_timeline2
533 .timelines()
534 .get(timeline1.name())
535 .unwrap()
536 .is_sorted_uncached()
537 );
538
539 assert!(
540 chunk_sorted_timeline2
541 .timelines()
542 .get(timeline2.name())
543 .unwrap()
544 .is_sorted()
545 );
546 assert!(
547 chunk_sorted_timeline2
548 .timelines()
549 .get(timeline2.name())
550 .unwrap()
551 .is_sorted_uncached()
552 );
553
554 let chunk_sorted_timeline2_expected =
555 Chunk::builder_with_id(chunk_id, entity_path.clone())
556 .with_sparse_component_batches(
557 row_id4,
558 [(timeline1, 1003), (timeline2, 42)],
559 [
560 (MyPoints::descriptor_points(), Some(&points4 as _)),
561 (MyPoints::descriptor_colors(), Some(&colors4 as _)),
562 ],
563 )
564 .with_sparse_component_batches(
565 row_id3,
566 [(timeline1, 1002), (timeline2, 43)],
567 [
568 (MyPoints::descriptor_points(), Some(&points3 as _)),
569 (MyPoints::descriptor_colors(), None),
570 ],
571 )
572 .with_sparse_component_batches(
573 row_id2,
574 [(timeline1, 1001), (timeline2, 44)],
575 [
576 (MyPoints::descriptor_points(), None),
577 (MyPoints::descriptor_colors(), Some(&colors2 as _)),
578 ],
579 )
580 .with_sparse_component_batches(
581 row_id1,
582 [(timeline1, 1000), (timeline2, 45)],
583 [
584 (MyPoints::descriptor_points(), Some(&points1 as _)),
585 (MyPoints::descriptor_colors(), Some(&colors1 as _)),
586 ],
587 )
588 .build()?;
589
590 eprintln!("expected:\n{chunk_sorted_timeline2}");
591
592 assert_eq!(
593 chunk_sorted_timeline2_expected,
594 chunk_sorted_timeline2,
595 "{}",
596 similar_asserts::SimpleDiff::from_str(
597 &format!("{chunk_sorted_timeline2_expected}"),
598 &format!("{chunk_sorted_timeline2}"),
599 "got",
600 "expected",
601 ),
602 );
603 }
604
605 Ok(())
606 }
607}