1use std::{
2 hash::{Hash as _, Hasher as _},
3 sync::Arc,
4 time::{Duration, Instant},
5};
6
7use arrow::array::{Array as ArrowArray, ArrayRef};
8use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
9use crossbeam::channel::{Receiver, Sender};
10use nohash_hasher::IntMap;
11
12use re_arrow_util::arrays_to_list_array_opt;
13use re_byte_size::SizeBytes as _;
14use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline, TimelineName};
15use re_types_core::ComponentDescriptor;
16
17use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
18
19#[derive(thiserror::Error, Debug)]
23pub enum ChunkBatcherError {
24 #[error("Failed to parse config: '{name}={value}': {err}")]
26 ParseConfig {
27 name: &'static str,
28 value: String,
29 err: Box<dyn std::error::Error + Send + Sync>,
30 },
31
32 #[error("Failed to spawn background thread '{name}': {err}")]
34 SpawnThread {
35 name: &'static str,
36 err: Box<dyn std::error::Error + Send + Sync>,
37 },
38}
39
40pub type ChunkBatcherResult<T> = Result<T, ChunkBatcherError>;
41
42#[derive(Clone, Default)]
44pub struct BatcherHooks {
45 #[allow(clippy::type_complexity)]
52 pub on_insert: Option<Arc<dyn Fn(&[PendingRow]) + Send + Sync>>,
53
54 pub on_release: Option<re_log_types::ArrowRecordBatchReleaseCallback>,
60}
61
62impl BatcherHooks {
63 pub const NONE: Self = Self {
64 on_insert: None,
65 on_release: None,
66 };
67}
68
69impl PartialEq for BatcherHooks {
70 fn eq(&self, other: &Self) -> bool {
71 let Self {
72 on_insert,
73 on_release,
74 } = self;
75
76 let on_insert_eq = match (on_insert, &other.on_insert) {
77 (Some(a), Some(b)) => Arc::ptr_eq(a, b),
78 (None, None) => true,
79 _ => false,
80 };
81
82 on_insert_eq && on_release == &other.on_release
83 }
84}
85
86impl std::fmt::Debug for BatcherHooks {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 let Self {
89 on_insert,
90 on_release,
91 } = self;
92 f.debug_struct("BatcherHooks")
93 .field("on_insert", &on_insert.as_ref().map(|_| "…"))
94 .field("on_release", &on_release)
95 .finish()
96 }
97}
98
99#[derive(Clone, Debug, PartialEq)]
105pub struct ChunkBatcherConfig {
106 pub flush_tick: Duration,
113
114 pub flush_num_bytes: u64,
118
119 pub flush_num_rows: u64,
121
122 pub chunk_max_rows_if_unsorted: u64,
125
126 pub max_commands_in_flight: Option<u64>,
130
131 pub max_chunks_in_flight: Option<u64>,
135
136 pub hooks: BatcherHooks,
138}
139
140impl Default for ChunkBatcherConfig {
141 fn default() -> Self {
142 Self::DEFAULT
143 }
144}
145
146impl ChunkBatcherConfig {
147 pub const DEFAULT: Self = Self {
149 flush_tick: Duration::from_millis(8), flush_num_bytes: 1024 * 1024, flush_num_rows: u64::MAX,
152 chunk_max_rows_if_unsorted: 256,
153 max_commands_in_flight: None,
154 max_chunks_in_flight: None,
155 hooks: BatcherHooks::NONE,
156 };
157
158 pub const ALWAYS: Self = Self {
160 flush_tick: Duration::MAX,
161 flush_num_bytes: 0,
162 flush_num_rows: 0,
163 chunk_max_rows_if_unsorted: 256,
164 max_commands_in_flight: None,
165 max_chunks_in_flight: None,
166 hooks: BatcherHooks::NONE,
167 };
168
169 pub const NEVER: Self = Self {
171 flush_tick: Duration::MAX,
172 flush_num_bytes: u64::MAX,
173 flush_num_rows: u64::MAX,
174 chunk_max_rows_if_unsorted: 256,
175 max_commands_in_flight: None,
176 max_chunks_in_flight: None,
177 hooks: BatcherHooks::NONE,
178 };
179
180 pub const ENV_FLUSH_TICK: &'static str = "RERUN_FLUSH_TICK_SECS";
182
183 pub const ENV_FLUSH_NUM_BYTES: &'static str = "RERUN_FLUSH_NUM_BYTES";
185
186 pub const ENV_FLUSH_NUM_ROWS: &'static str = "RERUN_FLUSH_NUM_ROWS";
188
189 pub const ENV_CHUNK_MAX_ROWS_IF_UNSORTED: &'static str = "RERUN_CHUNK_MAX_ROWS_IF_UNSORTED";
193
194 #[deprecated(note = "use `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` instead")]
196 const ENV_MAX_CHUNK_ROWS_IF_UNSORTED: &'static str = "RERUN_MAX_CHUNK_ROWS_IF_UNSORTED";
197
198 #[inline]
203 pub fn from_env() -> ChunkBatcherResult<Self> {
204 Self::default().apply_env()
205 }
206
207 pub fn apply_env(&self) -> ChunkBatcherResult<Self> {
212 let mut new = self.clone();
213
214 if let Ok(s) = std::env::var(Self::ENV_FLUSH_TICK) {
215 let flush_duration_secs: f64 =
216 s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
217 name: Self::ENV_FLUSH_TICK,
218 value: s.clone(),
219 err: Box::new(err),
220 })?;
221
222 new.flush_tick = Duration::from_secs_f64(flush_duration_secs);
223 }
224
225 if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) {
226 if let Some(num_bytes) = re_format::parse_bytes(&s) {
227 new.flush_num_bytes = num_bytes.unsigned_abs();
229 } else {
230 new.flush_num_bytes = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
232 name: Self::ENV_FLUSH_NUM_BYTES,
233 value: s.clone(),
234 err: Box::new(err),
235 })?;
236 }
237 }
238
239 if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_ROWS) {
240 new.flush_num_rows = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
241 name: Self::ENV_FLUSH_NUM_ROWS,
242 value: s.clone(),
243 err: Box::new(err),
244 })?;
245 }
246
247 if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED) {
248 new.chunk_max_rows_if_unsorted =
249 s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
250 name: Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED,
251 value: s.clone(),
252 err: Box::new(err),
253 })?;
254 }
255
256 #[expect(deprecated)]
258 if let Ok(s) = std::env::var(Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED) {
259 new.chunk_max_rows_if_unsorted =
260 s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
261 name: Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED,
262 value: s.clone(),
263 err: Box::new(err),
264 })?;
265 }
266
267 Ok(new)
268 }
269}
270
271#[test]
272fn chunk_batcher_config() {
273 std::env::set_var("RERUN_FLUSH_TICK_SECS", "0.3");
275 std::env::set_var("RERUN_FLUSH_NUM_BYTES", "42");
276 std::env::set_var("RERUN_FLUSH_NUM_ROWS", "666");
277 std::env::set_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED", "7777");
278
279 let config = ChunkBatcherConfig::from_env().unwrap();
280 let expected = ChunkBatcherConfig {
281 flush_tick: Duration::from_millis(300),
282 flush_num_bytes: 42,
283 flush_num_rows: 666,
284 chunk_max_rows_if_unsorted: 7777,
285 ..Default::default()
286 };
287 assert_eq!(expected, config);
288
289 std::env::set_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED", "9999");
290
291 let config = ChunkBatcherConfig::from_env().unwrap();
292 let expected = ChunkBatcherConfig {
293 flush_tick: Duration::from_millis(300),
294 flush_num_bytes: 42,
295 flush_num_rows: 666,
296 chunk_max_rows_if_unsorted: 9999,
297 ..Default::default()
298 };
299 assert_eq!(expected, config);
300}
301
302#[derive(Clone)]
340pub struct ChunkBatcher {
341 inner: Arc<ChunkBatcherInner>,
342}
343
344struct ChunkBatcherInner {
347 tx_cmds: Sender<Command>,
351 rx_chunks: Option<Receiver<Chunk>>,
353 cmds_to_chunks_handle: Option<std::thread::JoinHandle<()>>,
354}
355
356impl Drop for ChunkBatcherInner {
357 fn drop(&mut self) {
358 if let Some(rx_chunks) = self.rx_chunks.take() {
361 if !rx_chunks.is_empty() {
362 re_log::warn!("Dropping data");
363 }
364 }
365
366 self.tx_cmds.send(Command::Shutdown).ok();
369 if let Some(handle) = self.cmds_to_chunks_handle.take() {
370 handle.join().ok();
371 }
372 }
373}
374
375enum Command {
376 AppendChunk(Chunk),
377 AppendRow(EntityPath, PendingRow),
378 Flush(Sender<()>),
379 Shutdown,
380}
381
382impl Command {
383 fn flush() -> (Self, Receiver<()>) {
384 let (tx, rx) = crossbeam::channel::bounded(0); (Self::Flush(tx), rx)
386 }
387}
388
389impl ChunkBatcher {
390 #[must_use = "Batching threads will automatically shutdown when this object is dropped"]
395 #[allow(clippy::needless_pass_by_value)]
396 pub fn new(config: ChunkBatcherConfig) -> ChunkBatcherResult<Self> {
397 let (tx_cmds, rx_cmd) = match config.max_commands_in_flight {
398 Some(cap) => crossbeam::channel::bounded(cap as _),
399 None => crossbeam::channel::unbounded(),
400 };
401
402 let (tx_chunk, rx_chunks) = match config.max_chunks_in_flight {
403 Some(cap) => crossbeam::channel::bounded(cap as _),
404 None => crossbeam::channel::unbounded(),
405 };
406
407 let cmds_to_chunks_handle = {
408 const NAME: &str = "ChunkBatcher::cmds_to_chunks";
409 std::thread::Builder::new()
410 .name(NAME.into())
411 .spawn({
412 let config = config.clone();
413 move || batching_thread(config, rx_cmd, tx_chunk)
414 })
415 .map_err(|err| ChunkBatcherError::SpawnThread {
416 name: NAME,
417 err: Box::new(err),
418 })?
419 };
420
421 re_log::debug!(?config, "creating new chunk batcher");
422
423 let inner = ChunkBatcherInner {
424 tx_cmds,
425 rx_chunks: Some(rx_chunks),
426 cmds_to_chunks_handle: Some(cmds_to_chunks_handle),
427 };
428
429 Ok(Self {
430 inner: Arc::new(inner),
431 })
432 }
433
434 pub fn push_chunk(&self, chunk: Chunk) {
437 self.inner.push_chunk(chunk);
438 }
439
440 #[inline]
446 pub fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
447 self.inner.push_row(entity_path, row);
448 }
449
450 #[inline]
455 pub fn flush_async(&self) {
456 self.inner.flush_async();
457 }
458
459 #[inline]
463 pub fn flush_blocking(&self) {
464 self.inner.flush_blocking();
465 }
466
467 pub fn chunks(&self) -> Receiver<Chunk> {
475 #[allow(clippy::unwrap_used)]
478 self.inner.rx_chunks.clone().unwrap()
479 }
480}
481
482impl ChunkBatcherInner {
483 fn push_chunk(&self, chunk: Chunk) {
484 self.send_cmd(Command::AppendChunk(chunk));
485 }
486
487 fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
488 self.send_cmd(Command::AppendRow(entity_path, row));
489 }
490
491 fn flush_async(&self) {
492 let (flush_cmd, _) = Command::flush();
493 self.send_cmd(flush_cmd);
494 }
495
496 fn flush_blocking(&self) {
497 let (flush_cmd, oneshot) = Command::flush();
498 self.send_cmd(flush_cmd);
499 oneshot.recv().ok();
500 }
501
502 fn send_cmd(&self, cmd: Command) {
503 self.tx_cmds.send(cmd).ok();
506 }
507}
508
509#[allow(clippy::needless_pass_by_value)]
510fn batching_thread(config: ChunkBatcherConfig, rx_cmd: Receiver<Command>, tx_chunk: Sender<Chunk>) {
511 let rx_tick = crossbeam::channel::tick(config.flush_tick);
512
513 struct Accumulator {
514 latest: Instant,
515 entity_path: EntityPath,
516 pending_rows: Vec<PendingRow>,
517 pending_num_bytes: u64,
518 }
519
520 impl Accumulator {
521 fn new(entity_path: EntityPath) -> Self {
522 Self {
523 entity_path,
524 latest: Instant::now(),
525 pending_rows: Default::default(),
526 pending_num_bytes: Default::default(),
527 }
528 }
529
530 fn reset(&mut self) {
531 self.latest = Instant::now();
532 self.pending_rows.clear();
533 self.pending_num_bytes = 0;
534 }
535 }
536
537 let mut accs: IntMap<EntityPath, Accumulator> = IntMap::default();
538
539 fn do_push_row(acc: &mut Accumulator, row: PendingRow) {
540 acc.pending_num_bytes += row.total_size_bytes();
541 acc.pending_rows.push(row);
542 }
543
544 fn do_flush_all(
545 acc: &mut Accumulator,
546 tx_chunk: &Sender<Chunk>,
547 reason: &str,
548 chunk_max_rows_if_unsorted: u64,
549 ) {
550 let rows = std::mem::take(&mut acc.pending_rows);
551 if rows.is_empty() {
552 return;
553 }
554
555 re_log::trace!(
556 "Flushing {} rows and {} bytes. Reason: {reason}",
557 rows.len(),
558 re_format::format_bytes(acc.pending_num_bytes as _)
559 );
560
561 let chunks =
562 PendingRow::many_into_chunks(acc.entity_path.clone(), chunk_max_rows_if_unsorted, rows);
563 for chunk in chunks {
564 let mut chunk = match chunk {
565 Ok(chunk) => chunk,
566 Err(err) => {
567 re_log::error!(%err, "corrupt chunk detected, dropping");
568 continue;
569 }
570 };
571
572 let split_indicators = chunk.split_indicators();
576 if !chunk.components.is_empty() {
577 tx_chunk.send(chunk).ok();
579 }
580 if let Some(split_indicators) = split_indicators {
581 tx_chunk.send(split_indicators).ok();
582 }
583 }
584
585 acc.reset();
586 }
587
588 re_log::trace!(
589 "Flushing every: {:.2}s, {} rows, {}",
590 config.flush_tick.as_secs_f64(),
591 config.flush_num_rows,
592 re_format::format_bytes(config.flush_num_bytes as _),
593 );
594
595 let mut skip_next_tick = false;
598
599 use crossbeam::select;
600 loop {
601 select! {
602 recv(rx_cmd) -> cmd => {
603 let Ok(cmd) = cmd else {
604 break;
607 };
608
609
610 match cmd {
611 Command::AppendChunk(mut chunk) => {
612 let split_indicators = chunk.split_indicators();
616 if !chunk.components.is_empty() {
617 tx_chunk.send(chunk).ok();
619 }
620 if let Some(split_indicators) = split_indicators {
621 tx_chunk.send(split_indicators).ok();
622 }
623 },
624 Command::AppendRow(entity_path, row) => {
625 let acc = accs.entry(entity_path.clone())
626 .or_insert_with(|| Accumulator::new(entity_path));
627 do_push_row(acc, row);
628
629 if let Some(config) = config.hooks.on_insert.as_ref() {
630 config(&acc.pending_rows);
631 }
632
633 if acc.pending_rows.len() as u64 >= config.flush_num_rows {
634 do_flush_all(acc, &tx_chunk, "rows", config.chunk_max_rows_if_unsorted);
635 skip_next_tick = true;
636 } else if acc.pending_num_bytes >= config.flush_num_bytes {
637 do_flush_all(acc, &tx_chunk, "bytes", config.chunk_max_rows_if_unsorted);
638 skip_next_tick = true;
639 }
640 },
641
642 Command::Flush(oneshot) => {
643 skip_next_tick = true;
644 for acc in accs.values_mut() {
645 do_flush_all(acc, &tx_chunk, "manual", config.chunk_max_rows_if_unsorted);
646 }
647 drop(oneshot); },
649
650 Command::Shutdown => break,
651 };
652 },
653
654 recv(rx_tick) -> _ => {
655 if skip_next_tick {
656 skip_next_tick = false;
657 } else {
658 for acc in accs.values_mut() {
660 do_flush_all(acc, &tx_chunk, "tick", config.chunk_max_rows_if_unsorted);
661 }
662 }
663 },
664 };
665 }
666
667 drop(rx_cmd);
668 for acc in accs.values_mut() {
669 do_flush_all(
670 acc,
671 &tx_chunk,
672 "shutdown",
673 config.chunk_max_rows_if_unsorted,
674 );
675 }
676 drop(tx_chunk);
677
678 }
682
683#[derive(Debug, Clone)]
689pub struct PendingRow {
690 pub row_id: RowId,
693
694 pub timepoint: TimePoint,
696
697 pub components: IntMap<ComponentDescriptor, ArrayRef>,
701}
702
703impl PendingRow {
704 #[inline]
705 pub fn new(timepoint: TimePoint, components: IntMap<ComponentDescriptor, ArrayRef>) -> Self {
706 Self {
707 row_id: RowId::new(),
708 timepoint,
709 components,
710 }
711 }
712}
713
714impl re_byte_size::SizeBytes for PendingRow {
715 #[inline]
716 fn heap_size_bytes(&self) -> u64 {
717 let Self {
718 row_id,
719 timepoint,
720 components,
721 } = self;
722
723 row_id.heap_size_bytes() + timepoint.heap_size_bytes() + components.heap_size_bytes()
724 }
725}
726
727impl PendingRow {
728 pub fn into_chunk(self, entity_path: EntityPath) -> ChunkResult<Chunk> {
735 let Self {
736 row_id,
737 timepoint,
738 components,
739 } = self;
740
741 let timelines = timepoint
742 .into_iter()
743 .map(|(timeline_name, cell)| {
744 let times = ArrowScalarBuffer::from(vec![cell.as_i64()]);
745 let time_column =
746 TimeColumn::new(Some(true), Timeline::new(timeline_name, cell.typ()), times);
747 (timeline_name, time_column)
748 })
749 .collect();
750
751 let mut per_name = ChunkComponents::default();
752 for (component_desc, array) in components {
753 let list_array = arrays_to_list_array_opt(&[Some(&*array as _)]);
754 if let Some(list_array) = list_array {
755 per_name.insert_descriptor(component_desc, list_array);
756 }
757 }
758
759 Chunk::from_native_row_ids(
760 ChunkId::new(),
761 entity_path,
762 Some(true),
763 &[row_id],
764 timelines,
765 per_name,
766 )
767 }
768
769 pub fn many_into_chunks(
783 entity_path: EntityPath,
784 chunk_max_rows_if_unsorted: u64,
785 mut rows: Vec<Self>,
786 ) -> impl Iterator<Item = ChunkResult<Chunk>> {
787 re_tracing::profile_function!();
788
789 {
792 re_tracing::profile_scope!("sort rows");
793 rows.sort_by_key(|row| row.row_id);
794 }
795
796 let mut per_timeline_set: IntMap<u64 , Vec<Self>> = Default::default();
798 {
799 re_tracing::profile_scope!("compute timeline sets");
800
801 for row in rows {
804 let mut hasher = ahash::AHasher::default();
805 row.timepoint.timeline_names().for_each(|timeline| {
806 <TimelineName as std::hash::Hash>::hash(timeline, &mut hasher);
807 });
808
809 per_timeline_set
810 .entry(hasher.finish())
811 .or_default()
812 .push(row);
813 }
814 }
815
816 per_timeline_set.into_values().flat_map(move |rows| {
817 re_tracing::profile_scope!("iterate per timeline set");
818
819 let mut per_datatype_set: IntMap<u64 , Vec<Self>> =
821 Default::default();
822 {
823 re_tracing::profile_scope!("compute datatype sets");
824
825 for row in rows {
838 let mut hasher = ahash::AHasher::default();
839 row.components
840 .values()
841 .for_each(|array| array.data_type().hash(&mut hasher));
842 per_datatype_set
843 .entry(hasher.finish())
844 .or_default()
845 .push(row);
846 }
847 }
848
849 let entity_path = entity_path.clone();
851 per_datatype_set.into_values().flat_map(move |rows| {
852 re_tracing::profile_scope!("iterate per datatype set");
853
854 let mut row_ids: Vec<RowId> = Vec::with_capacity(rows.len());
855 let mut timelines: IntMap<TimelineName, PendingTimeColumn> = IntMap::default();
856
857 let mut all_components: IntMap<ComponentDescriptor, Vec<Option<&dyn ArrowArray>>> =
860 IntMap::default();
861 for row in &rows {
862 for component_desc in row.components.keys() {
863 all_components.entry(component_desc.clone()).or_default();
864 }
865 }
866
867 let mut chunks = Vec::new();
868
869 let mut components = all_components.clone();
870 for row in &rows {
871 let Self {
872 row_id,
873 timepoint: row_timepoint,
874 components: row_components,
875 } = row;
876
877 for (&timeline_name, cell) in row_timepoint {
881 let time_column = timelines.entry(timeline_name).or_insert_with(|| {
882 PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
883 });
884
885 if !row_ids.is_empty() && row_ids.len() as u64 >= chunk_max_rows_if_unsorted
887 && !time_column.is_sorted
888 {
889 chunks.push(Chunk::from_native_row_ids(
890 ChunkId::new(),
891 entity_path.clone(),
892 Some(true),
893 &std::mem::take(&mut row_ids),
894 std::mem::take(&mut timelines)
895 .into_iter()
896 .map(|(name, time_column)| (name, time_column.finish()))
897 .collect(),
898 {
899 let mut per_name = ChunkComponents::default();
900 for (component_desc, arrays) in std::mem::take(&mut components)
901 {
902 let list_array = arrays_to_list_array_opt(&arrays);
903 if let Some(list_array) = list_array {
904 per_name.insert_descriptor(component_desc, list_array);
905 }
906 }
907 per_name
908 },
909 ));
910
911 components = all_components.clone();
912 }
913 }
914
915 row_ids.push(*row_id);
916
917 for (&timeline_name, &cell) in row_timepoint {
918 let time_column = timelines.entry(timeline_name).or_insert_with(|| {
919 PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
920 });
921 time_column.push(cell.into());
922 }
923
924 for (component_desc, arrays) in &mut components {
925 arrays.push(
928 row_components
929 .get(component_desc)
930 .map(|array| &**array as &dyn ArrowArray),
931 );
932 }
933 }
934
935 chunks.push(Chunk::from_native_row_ids(
936 ChunkId::new(),
937 entity_path.clone(),
938 Some(true),
939 &std::mem::take(&mut row_ids),
940 timelines
941 .into_iter()
942 .map(|(timeline, time_column)| (timeline, time_column.finish()))
943 .collect(),
944 {
945 let mut per_name = ChunkComponents::default();
946 for (component_desc, arrays) in components {
947 let list_array = arrays_to_list_array_opt(&arrays);
948 if let Some(list_array) = list_array {
949 per_name.insert_descriptor(component_desc, list_array);
950 }
951 }
952 per_name
953 },
954 ));
955
956 chunks
957 })
958 })
959 }
960}
961
962struct PendingTimeColumn {
966 timeline: Timeline,
967 times: Vec<i64>,
968 is_sorted: bool,
969 time_range: ResolvedTimeRange,
970}
971
972impl PendingTimeColumn {
973 fn new(timeline: Timeline) -> Self {
974 Self {
975 timeline,
976 times: Default::default(),
977 is_sorted: true,
978 time_range: ResolvedTimeRange::EMPTY,
979 }
980 }
981
982 fn push(&mut self, time: TimeInt) {
984 let Self {
985 timeline: _,
986 times,
987 is_sorted,
988 time_range,
989 } = self;
990
991 *is_sorted &= times.last().copied().unwrap_or(TimeInt::MIN.as_i64()) <= time.as_i64();
992 time_range.set_min(TimeInt::min(time_range.min(), time));
993 time_range.set_max(TimeInt::max(time_range.max(), time));
994 times.push(time.as_i64());
995 }
996
997 fn finish(self) -> TimeColumn {
998 let Self {
999 timeline,
1000 times,
1001 is_sorted,
1002 time_range,
1003 } = self;
1004
1005 TimeColumn {
1006 timeline,
1007 times: ArrowScalarBuffer::from(times),
1008 is_sorted,
1009 time_range,
1010 }
1011 }
1012}
1013
1014#[cfg(test)]
1021mod tests {
1022 use crossbeam::channel::TryRecvError;
1023
1024 use re_log_types::example_components::{MyColor, MyIndex, MyLabel, MyPoint, MyPoint64};
1025 use re_types_core::{Component as _, Loggable as _};
1026
1027 use super::*;
1028
1029 #[test]
1031 fn simple() -> anyhow::Result<()> {
1032 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1033
1034 let timeline1 = Timeline::new_duration("log_time");
1035
1036 let timepoint1 = TimePoint::default().with(timeline1, 42);
1037 let timepoint2 = TimePoint::default().with(timeline1, 43);
1038 let timepoint3 = TimePoint::default().with(timeline1, 44);
1039
1040 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1041 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1042 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1043
1044 let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1045 let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1046 let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1047
1048 let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1049 let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1050 let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1051
1052 let components1 = [
1053 (MyPoint::descriptor(), points1.clone()),
1054 (MyLabel::descriptor(), labels1.clone()),
1055 (MyIndex::descriptor(), indices1.clone()),
1056 ];
1057 let components2 = [
1058 (MyPoint::descriptor(), points2.clone()),
1059 (MyLabel::descriptor(), labels2.clone()),
1060 (MyIndex::descriptor(), indices2.clone()),
1061 ];
1062 let components3 = [
1063 (MyPoint::descriptor(), points3.clone()),
1064 (MyLabel::descriptor(), labels3.clone()),
1065 (MyIndex::descriptor(), indices3.clone()),
1066 ];
1067
1068 let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1069 let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1070 let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1071
1072 let entity_path1: EntityPath = "a/b/c".into();
1073 batcher.push_row(entity_path1.clone(), row1.clone());
1074 batcher.push_row(entity_path1.clone(), row2.clone());
1075 batcher.push_row(entity_path1.clone(), row3.clone());
1076
1077 let chunks_rx = batcher.chunks();
1078 drop(batcher); let mut chunks = Vec::new();
1081 loop {
1082 let chunk = match chunks_rx.try_recv() {
1083 Ok(chunk) => chunk,
1084 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1085 Err(TryRecvError::Disconnected) => break,
1086 };
1087 chunks.push(chunk);
1088 }
1089
1090 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1091
1092 eprintln!("Chunks:");
1094 for chunk in &chunks {
1095 eprintln!("{chunk}");
1096 }
1097
1098 assert_eq!(1, chunks.len());
1099
1100 {
1101 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1102 let expected_timelines = [(
1103 *timeline1.name(),
1104 TimeColumn::new(Some(true), timeline1, vec![42, 43, 44].into()),
1105 )];
1106 let expected_components = [
1107 (
1108 MyPoint::descriptor(),
1109 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1110 ), (
1112 MyLabel::descriptor(),
1113 arrays_to_list_array_opt(&[&*labels1, &*labels2, &*labels3].map(Some)).unwrap(),
1114 ), (
1116 MyIndex::descriptor(),
1117 arrays_to_list_array_opt(&[&*indices1, &*indices2, &*indices3].map(Some))
1118 .unwrap(),
1119 ), ];
1121 let expected_chunk = Chunk::from_native_row_ids(
1122 chunks[0].id,
1123 entity_path1.clone(),
1124 None,
1125 &expected_row_ids,
1126 expected_timelines.into_iter().collect(),
1127 expected_components.into_iter().collect(),
1128 )?;
1129
1130 eprintln!("Expected:\n{expected_chunk}");
1131 eprintln!("Got:\n{}", chunks[0]);
1132 assert_eq!(expected_chunk, chunks[0]);
1133 }
1134
1135 Ok(())
1136 }
1137
1138 #[test]
1139 #[allow(clippy::len_zero)]
1140 fn simple_but_hashes_might_not_match() -> anyhow::Result<()> {
1141 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1142
1143 let timeline1 = Timeline::new_duration("log_time");
1144
1145 let timepoint1 = TimePoint::default().with(timeline1, 42);
1146 let timepoint2 = TimePoint::default().with(timeline1, 43);
1147 let timepoint3 = TimePoint::default().with(timeline1, 44);
1148
1149 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1150 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1151 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1152
1153 let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1154 let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1155 let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1156
1157 let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1158 let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1159 let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1160
1161 let components1 = [
1162 (MyIndex::descriptor(), indices1.clone()),
1163 (MyPoint::descriptor(), points1.clone()),
1164 (MyLabel::descriptor(), labels1.clone()),
1165 ];
1166 let components2 = [
1167 (MyPoint::descriptor(), points2.clone()),
1168 (MyLabel::descriptor(), labels2.clone()),
1169 (MyIndex::descriptor(), indices2.clone()),
1170 ];
1171 let components3 = [
1172 (MyLabel::descriptor(), labels3.clone()),
1173 (MyIndex::descriptor(), indices3.clone()),
1174 (MyPoint::descriptor(), points3.clone()),
1175 ];
1176
1177 let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1178 let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1179 let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1180
1181 let entity_path1: EntityPath = "a/b/c".into();
1182 batcher.push_row(entity_path1.clone(), row1.clone());
1183 batcher.push_row(entity_path1.clone(), row2.clone());
1184 batcher.push_row(entity_path1.clone(), row3.clone());
1185
1186 let chunks_rx = batcher.chunks();
1187 drop(batcher); let mut chunks = Vec::new();
1190 loop {
1191 let chunk = match chunks_rx.try_recv() {
1192 Ok(chunk) => chunk,
1193 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1194 Err(TryRecvError::Disconnected) => break,
1195 };
1196 chunks.push(chunk);
1197 }
1198
1199 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1200
1201 eprintln!("Chunks:");
1203 for chunk in &chunks {
1204 eprintln!("{chunk}");
1205 }
1206
1207 assert!(chunks.len() >= 1);
1215
1216 Ok(())
1217 }
1218
1219 #[test]
1220 #[allow(clippy::zero_sized_map_values)]
1221 fn intmap_order_is_deterministic() {
1222 let descriptors = [
1223 MyPoint::descriptor(),
1224 MyColor::descriptor(),
1225 MyLabel::descriptor(),
1226 MyPoint64::descriptor(),
1227 MyIndex::descriptor(),
1228 ];
1229
1230 let expected: IntMap<ComponentDescriptor, ()> =
1231 descriptors.iter().cloned().map(|d| (d, ())).collect();
1232 let expected: Vec<_> = expected.into_keys().collect();
1233
1234 for _ in 0..1_000 {
1235 let got_collect: IntMap<ComponentDescriptor, ()> =
1236 descriptors.clone().into_iter().map(|d| (d, ())).collect();
1237 let got_collect: Vec<_> = got_collect.into_keys().collect();
1238
1239 let mut got_insert: IntMap<ComponentDescriptor, ()> = Default::default();
1240 for d in descriptors.clone() {
1241 got_insert.insert(d, ());
1242 }
1243 let got_insert: Vec<_> = got_insert.into_keys().collect();
1244
1245 assert_eq!(expected, got_collect);
1246 assert_eq!(expected, got_insert);
1247 }
1248 }
1249
1250 #[test]
1252 fn simple_static() -> anyhow::Result<()> {
1253 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1254
1255 let static_ = TimePoint::default();
1256
1257 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1258 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1259 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1260
1261 let components1 = [(MyPoint::descriptor(), points1.clone())];
1262 let components2 = [(MyPoint::descriptor(), points2.clone())];
1263 let components3 = [(MyPoint::descriptor(), points3.clone())];
1264
1265 let row1 = PendingRow::new(static_.clone(), components1.into_iter().collect());
1266 let row2 = PendingRow::new(static_.clone(), components2.into_iter().collect());
1267 let row3 = PendingRow::new(static_.clone(), components3.into_iter().collect());
1268
1269 let entity_path1: EntityPath = "a/b/c".into();
1270 batcher.push_row(entity_path1.clone(), row1.clone());
1271 batcher.push_row(entity_path1.clone(), row2.clone());
1272 batcher.push_row(entity_path1.clone(), row3.clone());
1273
1274 let chunks_rx = batcher.chunks();
1275 drop(batcher); let mut chunks = Vec::new();
1278 loop {
1279 let chunk = match chunks_rx.try_recv() {
1280 Ok(chunk) => chunk,
1281 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1282 Err(TryRecvError::Disconnected) => break,
1283 };
1284 chunks.push(chunk);
1285 }
1286
1287 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1288
1289 eprintln!("Chunks:");
1291 for chunk in &chunks {
1292 eprintln!("{chunk}");
1293 }
1294
1295 assert_eq!(1, chunks.len());
1296
1297 {
1298 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1299 let expected_timelines = [];
1300 let expected_components = [(
1301 MyPoint::descriptor(),
1302 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1303 )];
1304 let expected_chunk = Chunk::from_native_row_ids(
1305 chunks[0].id,
1306 entity_path1.clone(),
1307 None,
1308 &expected_row_ids,
1309 expected_timelines.into_iter().collect(),
1310 expected_components.into_iter().collect(),
1311 )?;
1312
1313 eprintln!("Expected:\n{expected_chunk}");
1314 eprintln!("Got:\n{}", chunks[0]);
1315 assert_eq!(expected_chunk, chunks[0]);
1316 }
1317
1318 Ok(())
1319 }
1320
1321 #[test]
1323 fn different_entities() -> anyhow::Result<()> {
1324 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1325
1326 let timeline1 = Timeline::new_duration("log_time");
1327
1328 let timepoint1 = TimePoint::default().with(timeline1, 42);
1329 let timepoint2 = TimePoint::default().with(timeline1, 43);
1330 let timepoint3 = TimePoint::default().with(timeline1, 44);
1331
1332 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1333 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1334 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1335
1336 let components1 = [(MyPoint::descriptor(), points1.clone())];
1337 let components2 = [(MyPoint::descriptor(), points2.clone())];
1338 let components3 = [(MyPoint::descriptor(), points3.clone())];
1339
1340 let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1341 let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1342 let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1343
1344 let entity_path1: EntityPath = "ent1".into();
1345 let entity_path2: EntityPath = "ent2".into();
1346 batcher.push_row(entity_path1.clone(), row1.clone());
1347 batcher.push_row(entity_path2.clone(), row2.clone());
1348 batcher.push_row(entity_path1.clone(), row3.clone());
1349
1350 let chunks_rx = batcher.chunks();
1351 drop(batcher); let mut chunks = Vec::new();
1354 loop {
1355 let chunk = match chunks_rx.try_recv() {
1356 Ok(chunk) => chunk,
1357 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1358 Err(TryRecvError::Disconnected) => break,
1359 };
1360 chunks.push(chunk);
1361 }
1362
1363 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1364
1365 eprintln!("Chunks:");
1367 for chunk in &chunks {
1368 eprintln!("{chunk}");
1369 }
1370
1371 assert_eq!(2, chunks.len());
1372
1373 {
1374 let expected_row_ids = vec![row1.row_id, row3.row_id];
1375 let expected_timelines = [(
1376 *timeline1.name(),
1377 TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1378 )];
1379 let expected_components = [(
1380 MyPoint::descriptor(),
1381 arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1382 )];
1383 let expected_chunk = Chunk::from_native_row_ids(
1384 chunks[0].id,
1385 entity_path1.clone(),
1386 None,
1387 &expected_row_ids,
1388 expected_timelines.into_iter().collect(),
1389 expected_components.into_iter().collect(),
1390 )?;
1391
1392 eprintln!("Expected:\n{expected_chunk}");
1393 eprintln!("Got:\n{}", chunks[0]);
1394 assert_eq!(expected_chunk, chunks[0]);
1395 }
1396
1397 {
1398 let expected_row_ids = vec![row2.row_id];
1399 let expected_timelines = [(
1400 *timeline1.name(),
1401 TimeColumn::new(Some(true), timeline1, vec![43].into()),
1402 )];
1403 let expected_components = [(
1404 MyPoint::descriptor(),
1405 arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1406 )];
1407 let expected_chunk = Chunk::from_native_row_ids(
1408 chunks[1].id,
1409 entity_path2.clone(),
1410 None,
1411 &expected_row_ids,
1412 expected_timelines.into_iter().collect(),
1413 expected_components.into_iter().collect(),
1414 )?;
1415
1416 eprintln!("Expected:\n{expected_chunk}");
1417 eprintln!("Got:\n{}", chunks[1]);
1418 assert_eq!(expected_chunk, chunks[1]);
1419 }
1420
1421 Ok(())
1422 }
1423
1424 #[test]
1426 fn different_timelines() -> anyhow::Result<()> {
1427 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1428
1429 let timeline1 = Timeline::new_duration("log_time");
1430 let timeline2 = Timeline::new_sequence("frame_nr");
1431
1432 let timepoint1 = TimePoint::default().with(timeline1, 42);
1433 let timepoint2 = TimePoint::default()
1434 .with(timeline1, 43)
1435 .with(timeline2, 1000);
1436 let timepoint3 = TimePoint::default()
1437 .with(timeline1, 44)
1438 .with(timeline2, 1001);
1439
1440 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1441 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1442 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1443
1444 let components1 = [(MyPoint::descriptor(), points1.clone())];
1445 let components2 = [(MyPoint::descriptor(), points2.clone())];
1446 let components3 = [(MyPoint::descriptor(), points3.clone())];
1447
1448 let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1449 let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1450 let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1451
1452 let entity_path1: EntityPath = "a/b/c".into();
1453 batcher.push_row(entity_path1.clone(), row1.clone());
1454 batcher.push_row(entity_path1.clone(), row2.clone());
1455 batcher.push_row(entity_path1.clone(), row3.clone());
1456
1457 let chunks_rx = batcher.chunks();
1458 drop(batcher); let mut chunks = Vec::new();
1461 loop {
1462 let chunk = match chunks_rx.try_recv() {
1463 Ok(chunk) => chunk,
1464 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1465 Err(TryRecvError::Disconnected) => break,
1466 };
1467 chunks.push(chunk);
1468 }
1469
1470 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1471
1472 eprintln!("Chunks:");
1474 for chunk in &chunks {
1475 eprintln!("{chunk}");
1476 }
1477
1478 assert_eq!(2, chunks.len());
1479
1480 {
1481 let expected_row_ids = vec![row1.row_id];
1482 let expected_timelines = [(
1483 *timeline1.name(),
1484 TimeColumn::new(Some(true), timeline1, vec![42].into()),
1485 )];
1486 let expected_components = [(
1487 MyPoint::descriptor(),
1488 arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(),
1489 )];
1490 let expected_chunk = Chunk::from_native_row_ids(
1491 chunks[0].id,
1492 entity_path1.clone(),
1493 None,
1494 &expected_row_ids,
1495 expected_timelines.into_iter().collect(),
1496 expected_components.into_iter().collect(),
1497 )?;
1498
1499 eprintln!("Expected:\n{expected_chunk}");
1500 eprintln!("Got:\n{}", chunks[0]);
1501 assert_eq!(expected_chunk, chunks[0]);
1502 }
1503
1504 {
1505 let expected_row_ids = vec![row2.row_id, row3.row_id];
1506 let expected_timelines = [
1507 (
1508 *timeline1.name(),
1509 TimeColumn::new(Some(true), timeline1, vec![43, 44].into()),
1510 ),
1511 (
1512 *timeline2.name(),
1513 TimeColumn::new(Some(true), timeline2, vec![1000, 1001].into()),
1514 ),
1515 ];
1516 let expected_components = [(
1517 MyPoint::descriptor(),
1518 arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(),
1519 )];
1520 let expected_chunk = Chunk::from_native_row_ids(
1521 chunks[1].id,
1522 entity_path1.clone(),
1523 None,
1524 &expected_row_ids,
1525 expected_timelines.into_iter().collect(),
1526 expected_components.into_iter().collect(),
1527 )?;
1528
1529 eprintln!("Expected:\n{expected_chunk}");
1530 eprintln!("Got:\n{}", chunks[1]);
1531 assert_eq!(expected_chunk, chunks[1]);
1532 }
1533
1534 Ok(())
1535 }
1536
1537 #[test]
1539 fn different_datatypes() -> anyhow::Result<()> {
1540 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1541
1542 let timeline1 = Timeline::new_duration("log_time");
1543
1544 let timepoint1 = TimePoint::default().with(timeline1, 42);
1545 let timepoint2 = TimePoint::default().with(timeline1, 43);
1546 let timepoint3 = TimePoint::default().with(timeline1, 44);
1547
1548 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1549 let points2 =
1550 MyPoint64::to_arrow([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?;
1551 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1552
1553 let components1 = [(MyPoint::descriptor(), points1.clone())];
1554 let components2 = [(MyPoint::descriptor(), points2.clone())]; let components3 = [(MyPoint::descriptor(), points3.clone())];
1556
1557 let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1558 let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1559 let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1560
1561 let entity_path1: EntityPath = "a/b/c".into();
1562 batcher.push_row(entity_path1.clone(), row1.clone());
1563 batcher.push_row(entity_path1.clone(), row2.clone());
1564 batcher.push_row(entity_path1.clone(), row3.clone());
1565
1566 let chunks_rx = batcher.chunks();
1567 drop(batcher); let mut chunks = Vec::new();
1570 loop {
1571 let chunk = match chunks_rx.try_recv() {
1572 Ok(chunk) => chunk,
1573 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1574 Err(TryRecvError::Disconnected) => break,
1575 };
1576 chunks.push(chunk);
1577 }
1578
1579 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1580
1581 eprintln!("Chunks:");
1583 for chunk in &chunks {
1584 eprintln!("{chunk}");
1585 }
1586
1587 assert_eq!(2, chunks.len());
1588
1589 {
1590 let expected_row_ids = vec![row1.row_id, row3.row_id];
1591 let expected_timelines = [(
1592 *timeline1.name(),
1593 TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1594 )];
1595 let expected_components = [(
1596 MyPoint::descriptor(),
1597 arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1598 )];
1599 let expected_chunk = Chunk::from_native_row_ids(
1600 chunks[0].id,
1601 entity_path1.clone(),
1602 None,
1603 &expected_row_ids,
1604 expected_timelines.into_iter().collect(),
1605 expected_components.into_iter().collect(),
1606 )?;
1607
1608 eprintln!("Expected:\n{expected_chunk}");
1609 eprintln!("Got:\n{}", chunks[0]);
1610 assert_eq!(expected_chunk, chunks[0]);
1611 }
1612
1613 {
1614 let expected_row_ids = vec![row2.row_id];
1615 let expected_timelines = [(
1616 *timeline1.name(),
1617 TimeColumn::new(Some(true), timeline1, vec![43].into()),
1618 )];
1619 let expected_components = [(
1620 MyPoint::descriptor(),
1621 arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1622 )];
1623 let expected_chunk = Chunk::from_native_row_ids(
1624 chunks[1].id,
1625 entity_path1.clone(),
1626 None,
1627 &expected_row_ids,
1628 expected_timelines.into_iter().collect(),
1629 expected_components.into_iter().collect(),
1630 )?;
1631
1632 eprintln!("Expected:\n{expected_chunk}");
1633 eprintln!("Got:\n{}", chunks[1]);
1634 assert_eq!(expected_chunk, chunks[1]);
1635 }
1636
1637 Ok(())
1638 }
1639
1640 #[test]
1643 fn unsorted_timeline_below_threshold() -> anyhow::Result<()> {
1644 let batcher = ChunkBatcher::new(ChunkBatcherConfig {
1645 chunk_max_rows_if_unsorted: 1000,
1646 ..ChunkBatcherConfig::NEVER
1647 })?;
1648
1649 let timeline1 = Timeline::new_duration("log_time");
1650 let timeline2 = Timeline::new_duration("frame_nr");
1651
1652 let timepoint1 = TimePoint::default()
1653 .with(timeline2, 1000)
1654 .with(timeline1, 42);
1655 let timepoint2 = TimePoint::default()
1656 .with(timeline2, 1001)
1657 .with(timeline1, 43);
1658 let timepoint3 = TimePoint::default()
1659 .with(timeline2, 1002)
1660 .with(timeline1, 44);
1661 let timepoint4 = TimePoint::default()
1662 .with(timeline2, 1003)
1663 .with(timeline1, 45);
1664
1665 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1666 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1667 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1668 let points4 =
1669 MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1670
1671 let components1 = [(MyPoint::descriptor(), points1.clone())];
1672 let components2 = [(MyPoint::descriptor(), points2.clone())];
1673 let components3 = [(MyPoint::descriptor(), points3.clone())];
1674 let components4 = [(MyPoint::descriptor(), points4.clone())];
1675
1676 let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect());
1677 let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect());
1678 let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect());
1679 let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect());
1680
1681 let entity_path1: EntityPath = "a/b/c".into();
1682 batcher.push_row(entity_path1.clone(), row1.clone());
1683 batcher.push_row(entity_path1.clone(), row2.clone());
1684 batcher.push_row(entity_path1.clone(), row3.clone());
1685 batcher.push_row(entity_path1.clone(), row4.clone());
1686
1687 let chunks_rx = batcher.chunks();
1688 drop(batcher); let mut chunks = Vec::new();
1691 loop {
1692 let chunk = match chunks_rx.try_recv() {
1693 Ok(chunk) => chunk,
1694 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1695 Err(TryRecvError::Disconnected) => break,
1696 };
1697 chunks.push(chunk);
1698 }
1699
1700 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1701
1702 eprintln!("Chunks:");
1704 for chunk in &chunks {
1705 eprintln!("{chunk}");
1706 }
1707
1708 assert_eq!(1, chunks.len());
1709
1710 {
1711 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id, row4.row_id];
1712 let expected_timelines = [
1713 (
1714 *timeline1.name(),
1715 TimeColumn::new(Some(false), timeline1, vec![45, 42, 43, 44].into()),
1716 ),
1717 (
1718 *timeline2.name(),
1719 TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001, 1002].into()),
1720 ),
1721 ];
1722 let expected_components = [(
1723 MyPoint::descriptor(),
1724 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3, &*points4].map(Some))
1725 .unwrap(),
1726 )];
1727 let expected_chunk = Chunk::from_native_row_ids(
1728 chunks[0].id,
1729 entity_path1.clone(),
1730 None,
1731 &expected_row_ids,
1732 expected_timelines.into_iter().collect(),
1733 expected_components.into_iter().collect(),
1734 )?;
1735
1736 eprintln!("Expected:\n{expected_chunk}");
1737 eprintln!("Got:\n{}", chunks[0]);
1738 assert_eq!(expected_chunk, chunks[0]);
1739 }
1740
1741 Ok(())
1742 }
1743
1744 #[test]
1747 fn unsorted_timeline_above_threshold() -> anyhow::Result<()> {
1748 let batcher = ChunkBatcher::new(ChunkBatcherConfig {
1749 chunk_max_rows_if_unsorted: 3,
1750 ..ChunkBatcherConfig::NEVER
1751 })?;
1752
1753 let timeline1 = Timeline::new_duration("log_time");
1754 let timeline2 = Timeline::new_duration("frame_nr");
1755
1756 let timepoint1 = TimePoint::default()
1757 .with(timeline2, 1000)
1758 .with(timeline1, 42);
1759 let timepoint2 = TimePoint::default()
1760 .with(timeline2, 1001)
1761 .with(timeline1, 43);
1762 let timepoint3 = TimePoint::default()
1763 .with(timeline2, 1002)
1764 .with(timeline1, 44);
1765 let timepoint4 = TimePoint::default()
1766 .with(timeline2, 1003)
1767 .with(timeline1, 45);
1768
1769 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1770 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1771 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1772 let points4 =
1773 MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1774
1775 let components1 = [(MyPoint::descriptor(), points1.clone())];
1776 let components2 = [(MyPoint::descriptor(), points2.clone())];
1777 let components3 = [(MyPoint::descriptor(), points3.clone())];
1778 let components4 = [(MyPoint::descriptor(), points4.clone())];
1779
1780 let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect());
1781 let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect());
1782 let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect());
1783 let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect());
1784
1785 let entity_path1: EntityPath = "a/b/c".into();
1786 batcher.push_row(entity_path1.clone(), row1.clone());
1787 batcher.push_row(entity_path1.clone(), row2.clone());
1788 batcher.push_row(entity_path1.clone(), row3.clone());
1789 batcher.push_row(entity_path1.clone(), row4.clone());
1790
1791 let chunks_rx = batcher.chunks();
1792 drop(batcher); let mut chunks = Vec::new();
1795 loop {
1796 let chunk = match chunks_rx.try_recv() {
1797 Ok(chunk) => chunk,
1798 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1799 Err(TryRecvError::Disconnected) => break,
1800 };
1801 chunks.push(chunk);
1802 }
1803
1804 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1805
1806 eprintln!("Chunks:");
1808 for chunk in &chunks {
1809 eprintln!("{chunk}");
1810 }
1811
1812 assert_eq!(2, chunks.len());
1813
1814 {
1815 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1816 let expected_timelines = [
1817 (
1818 *timeline1.name(),
1819 TimeColumn::new(Some(false), timeline1, vec![45, 42, 43].into()),
1820 ),
1821 (
1822 *timeline2.name(),
1823 TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001].into()),
1824 ),
1825 ];
1826 let expected_components = [(
1827 MyPoint::descriptor(),
1828 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1829 )];
1830 let expected_chunk = Chunk::from_native_row_ids(
1831 chunks[0].id,
1832 entity_path1.clone(),
1833 None,
1834 &expected_row_ids,
1835 expected_timelines.into_iter().collect(),
1836 expected_components.into_iter().collect(),
1837 )?;
1838
1839 eprintln!("Expected:\n{expected_chunk}");
1840 eprintln!("Got:\n{}", chunks[0]);
1841 assert_eq!(expected_chunk, chunks[0]);
1842 }
1843
1844 {
1845 let expected_row_ids = vec![row4.row_id];
1846 let expected_timelines = [
1847 (
1848 *timeline1.name(),
1849 TimeColumn::new(Some(true), timeline1, vec![44].into()),
1850 ),
1851 (
1852 *timeline2.name(),
1853 TimeColumn::new(Some(true), timeline2, vec![1002].into()),
1854 ),
1855 ];
1856 let expected_components = [(
1857 MyPoint::descriptor(),
1858 arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(),
1859 )];
1860 let expected_chunk = Chunk::from_native_row_ids(
1861 chunks[1].id,
1862 entity_path1.clone(),
1863 None,
1864 &expected_row_ids,
1865 expected_timelines.into_iter().collect(),
1866 expected_components.into_iter().collect(),
1867 )?;
1868
1869 eprintln!("Expected:\n{expected_chunk}");
1870 eprintln!("Got:\n{}", chunks[1]);
1871 assert_eq!(expected_chunk, chunks[1]);
1872 }
1873
1874 Ok(())
1875 }
1876}