1use std::{
2 hash::{Hash as _, Hasher as _},
3 sync::Arc,
4 time::{Duration, Instant},
5};
6
7use arrow::array::{Array as ArrowArray, ArrayRef};
8use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
9use crossbeam::channel::{Receiver, Sender};
10use nohash_hasher::IntMap;
11
12use re_arrow_util::arrays_to_list_array_opt;
13use re_byte_size::SizeBytes as _;
14use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, TimePoint, Timeline, TimelineName};
15use re_types_core::ComponentDescriptor;
16
17use crate::{Chunk, ChunkId, ChunkResult, RowId, TimeColumn, chunk::ChunkComponents};
18
19#[derive(Debug, thiserror::Error)]
23pub enum BatcherFlushError {
24 #[error("Batcher stopped before flushing completed")]
25 Closed,
26
27 #[error("Batcher flush timed out - not all messages were sent.")]
28 Timeout,
29}
30
31#[derive(thiserror::Error, Debug)]
33pub enum ChunkBatcherError {
34 #[error("Failed to parse config: '{name}={value}': {err}")]
36 ParseConfig {
37 name: &'static str,
38 value: String,
39 err: Box<dyn std::error::Error + Send + Sync>,
40 },
41
42 #[error("Failed to spawn background thread '{name}': {err}")]
44 SpawnThread {
45 name: &'static str,
46 err: Box<dyn std::error::Error + Send + Sync>,
47 },
48}
49
50pub type ChunkBatcherResult<T> = Result<T, ChunkBatcherError>;
51
52#[derive(Clone, Default)]
54pub struct BatcherHooks {
55 #[expect(clippy::type_complexity)]
62 pub on_insert: Option<Arc<dyn Fn(&[PendingRow]) + Send + Sync>>,
63
64 #[expect(clippy::type_complexity)]
69 pub on_config_change: Option<Arc<dyn Fn(&ChunkBatcherConfig) + Send + Sync>>,
70
71 pub on_release: Option<re_log_types::ArrowRecordBatchReleaseCallback>,
77}
78
79impl BatcherHooks {
80 pub const NONE: Self = Self {
81 on_insert: None,
82 on_config_change: None,
83 on_release: None,
84 };
85}
86
87impl PartialEq for BatcherHooks {
88 fn eq(&self, other: &Self) -> bool {
89 let Self {
90 on_insert,
91 on_config_change,
92 on_release,
93 } = self;
94
95 let on_insert_eq = match (on_insert, &other.on_insert) {
96 (Some(a), Some(b)) => Arc::ptr_eq(a, b),
97 (None, None) => true,
98 _ => false,
99 };
100
101 let on_config_change_eq = match (on_config_change, &other.on_config_change) {
102 (Some(a), Some(b)) => Arc::ptr_eq(a, b),
103 (None, None) => true,
104 _ => false,
105 };
106
107 on_insert_eq && on_config_change_eq && on_release == &other.on_release
108 }
109}
110
111impl std::fmt::Debug for BatcherHooks {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 let Self {
114 on_insert,
115 on_config_change,
116 on_release,
117 } = self;
118 f.debug_struct("BatcherHooks")
119 .field("on_insert", &on_insert.as_ref().map(|_| "…"))
120 .field("on_config_change", &on_config_change.as_ref().map(|_| "…"))
121 .field("on_release", &on_release)
122 .finish()
123 }
124}
125
126#[derive(Clone, Debug, PartialEq, Eq)]
132pub struct ChunkBatcherConfig {
133 pub flush_tick: Duration,
140
141 pub flush_num_bytes: u64,
145
146 pub flush_num_rows: u64,
148
149 pub chunk_max_rows_if_unsorted: u64,
152
153 pub max_commands_in_flight: Option<u64>,
158
159 pub max_chunks_in_flight: Option<u64>,
164}
165
166impl Default for ChunkBatcherConfig {
167 fn default() -> Self {
168 Self::DEFAULT
169 }
170}
171
172impl ChunkBatcherConfig {
173 pub const DEFAULT: Self = Self {
175 flush_tick: Duration::from_millis(200),
176 flush_num_bytes: 1024 * 1024, flush_num_rows: u64::MAX,
178 chunk_max_rows_if_unsorted: 256,
179 max_commands_in_flight: None,
180 max_chunks_in_flight: None,
181 };
182
183 pub const LOW_LATENCY: Self = Self {
185 flush_tick: Duration::from_millis(8), ..Self::DEFAULT
187 };
188
189 pub const ALWAYS: Self = Self {
191 flush_tick: Duration::MAX,
192 flush_num_bytes: 0,
193 flush_num_rows: 0,
194 chunk_max_rows_if_unsorted: 256,
195 max_commands_in_flight: None,
196 max_chunks_in_flight: None,
197 };
198
199 pub const NEVER: Self = Self {
201 flush_tick: Duration::MAX,
202 flush_num_bytes: u64::MAX,
203 flush_num_rows: u64::MAX,
204 chunk_max_rows_if_unsorted: 256,
205 max_commands_in_flight: None,
206 max_chunks_in_flight: None,
207 };
208
209 pub const ENV_FLUSH_TICK: &'static str = "RERUN_FLUSH_TICK_SECS";
211
212 pub const ENV_FLUSH_NUM_BYTES: &'static str = "RERUN_FLUSH_NUM_BYTES";
214
215 pub const ENV_FLUSH_NUM_ROWS: &'static str = "RERUN_FLUSH_NUM_ROWS";
217
218 pub const ENV_CHUNK_MAX_ROWS_IF_UNSORTED: &'static str = "RERUN_CHUNK_MAX_ROWS_IF_UNSORTED";
222
223 #[deprecated(note = "use `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` instead")]
225 const ENV_MAX_CHUNK_ROWS_IF_UNSORTED: &'static str = "RERUN_MAX_CHUNK_ROWS_IF_UNSORTED";
226
227 #[inline]
232 pub fn from_env() -> ChunkBatcherResult<Self> {
233 Self::default().apply_env()
234 }
235
236 pub fn apply_env(&self) -> ChunkBatcherResult<Self> {
241 let mut new = self.clone();
242
243 if let Ok(s) = std::env::var(Self::ENV_FLUSH_TICK) {
244 let flush_duration_secs: f64 =
245 s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
246 name: Self::ENV_FLUSH_TICK,
247 value: s.clone(),
248 err: Box::new(err),
249 })?;
250
251 new.flush_tick = Duration::try_from_secs_f64(flush_duration_secs).map_err(|err| {
252 ChunkBatcherError::ParseConfig {
253 name: Self::ENV_FLUSH_TICK,
254 value: s.clone(),
255 err: Box::new(err),
256 }
257 })?;
258 }
259
260 if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) {
261 if let Some(num_bytes) = re_format::parse_bytes(&s) {
262 new.flush_num_bytes = num_bytes.unsigned_abs();
264 } else {
265 new.flush_num_bytes = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
267 name: Self::ENV_FLUSH_NUM_BYTES,
268 value: s.clone(),
269 err: Box::new(err),
270 })?;
271 }
272 }
273
274 if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_ROWS) {
275 new.flush_num_rows = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
276 name: Self::ENV_FLUSH_NUM_ROWS,
277 value: s.clone(),
278 err: Box::new(err),
279 })?;
280 }
281
282 if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED) {
283 new.chunk_max_rows_if_unsorted =
284 s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
285 name: Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED,
286 value: s.clone(),
287 err: Box::new(err),
288 })?;
289 }
290
291 #[expect(deprecated)]
293 if let Ok(s) = std::env::var(Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED) {
294 new.chunk_max_rows_if_unsorted =
295 s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
296 name: Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED,
297 value: s.clone(),
298 err: Box::new(err),
299 })?;
300 }
301
302 Ok(new)
303 }
304}
305
306#[test]
307fn chunk_batcher_config() {
308 #![expect(unsafe_code)] unsafe {
313 std::env::set_var("RERUN_FLUSH_TICK_SECS", "0.3");
314 std::env::set_var("RERUN_FLUSH_NUM_BYTES", "42");
315 std::env::set_var("RERUN_FLUSH_NUM_ROWS", "666");
316 std::env::set_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED", "7777");
317 }
318
319 let config = ChunkBatcherConfig::from_env().unwrap();
320 let expected = ChunkBatcherConfig {
321 flush_tick: Duration::from_millis(300),
322 flush_num_bytes: 42,
323 flush_num_rows: 666,
324 chunk_max_rows_if_unsorted: 7777,
325 ..Default::default()
326 };
327 assert_eq!(expected, config);
328
329 unsafe {
331 std::env::set_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED", "9999");
332 }
333
334 let config = ChunkBatcherConfig::from_env().unwrap();
335 let expected = ChunkBatcherConfig {
336 flush_tick: Duration::from_millis(300),
337 flush_num_bytes: 42,
338 flush_num_rows: 666,
339 chunk_max_rows_if_unsorted: 9999,
340 ..Default::default()
341 };
342 assert_eq!(expected, config);
343}
344
345#[derive(Clone)]
383pub struct ChunkBatcher {
384 inner: Arc<ChunkBatcherInner>,
385}
386
387struct ChunkBatcherInner {
390 tx_cmds: Sender<Command>,
394 rx_chunks: Option<Receiver<Chunk>>,
396 cmds_to_chunks_handle: Option<std::thread::JoinHandle<()>>,
397}
398
399impl Drop for ChunkBatcherInner {
400 fn drop(&mut self) {
401 if let Some(rx_chunks) = self.rx_chunks.take()
404 && !rx_chunks.is_empty()
405 {
406 re_log::warn!("Dropping data");
407 }
408
409 self.tx_cmds.send(Command::Shutdown).ok();
412 if let Some(handle) = self.cmds_to_chunks_handle.take() {
413 handle.join().ok();
414 }
415 }
416}
417
418enum Command {
419 AppendChunk(Chunk),
420 AppendRow(EntityPath, PendingRow),
421 Flush { on_done: Sender<()> },
422 UpdateConfig(ChunkBatcherConfig),
423 Shutdown,
424}
425
426impl Command {
427 fn flush() -> (Self, Receiver<()>) {
428 let (tx, rx) = crossbeam::channel::bounded(1); (Self::Flush { on_done: tx }, rx)
430 }
431}
432
433impl ChunkBatcher {
434 #[must_use = "Batching threads will automatically shutdown when this object is dropped"]
439 #[expect(clippy::needless_pass_by_value)]
440 pub fn new(config: ChunkBatcherConfig, hooks: BatcherHooks) -> ChunkBatcherResult<Self> {
441 let (tx_cmds, rx_cmd) = match config.max_commands_in_flight {
442 Some(cap) => crossbeam::channel::bounded(cap as _),
443 None => crossbeam::channel::unbounded(),
444 };
445
446 let (tx_chunk, rx_chunks) = match config.max_chunks_in_flight {
447 Some(cap) => crossbeam::channel::bounded(cap as _),
448 None => crossbeam::channel::unbounded(),
449 };
450
451 let cmds_to_chunks_handle = {
452 const NAME: &str = "ChunkBatcher::cmds_to_chunks";
453 std::thread::Builder::new()
454 .name(NAME.into())
455 .spawn({
456 let config = config.clone();
457 move || batching_thread(config, hooks, rx_cmd, tx_chunk)
458 })
459 .map_err(|err| ChunkBatcherError::SpawnThread {
460 name: NAME,
461 err: Box::new(err),
462 })?
463 };
464
465 re_log::debug!(?config, "creating new chunk batcher");
466
467 let inner = ChunkBatcherInner {
468 tx_cmds,
469 rx_chunks: Some(rx_chunks),
470 cmds_to_chunks_handle: Some(cmds_to_chunks_handle),
471 };
472
473 Ok(Self {
474 inner: Arc::new(inner),
475 })
476 }
477
478 pub fn push_chunk(&self, chunk: Chunk) {
481 self.inner.push_chunk(chunk);
482 }
483
484 #[inline]
490 pub fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
491 self.inner.push_row(entity_path, row);
492 }
493
494 #[inline]
499 pub fn flush_async(&self) {
500 self.inner.flush_async();
501 }
502
503 #[inline]
507 pub fn flush_blocking(&self, timeout: Duration) -> Result<(), BatcherFlushError> {
508 self.inner.flush_blocking(timeout)
509 }
510
511 pub fn update_config(&self, config: ChunkBatcherConfig) {
513 self.inner.update_config(config);
514 }
515
516 pub fn chunks(&self) -> Receiver<Chunk> {
524 #[expect(clippy::unwrap_used)]
527 self.inner.rx_chunks.clone().unwrap()
528 }
529}
530
531impl ChunkBatcherInner {
532 fn push_chunk(&self, chunk: Chunk) {
533 self.send_cmd(Command::AppendChunk(chunk));
534 }
535
536 fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
537 self.send_cmd(Command::AppendRow(entity_path, row));
538 }
539
540 fn flush_async(&self) {
541 let (flush_cmd, _) = Command::flush();
542 self.send_cmd(flush_cmd);
543 }
544
545 fn flush_blocking(&self, timeout: Duration) -> Result<(), BatcherFlushError> {
546 use crossbeam::channel::RecvTimeoutError;
547
548 let (flush_cmd, on_done) = Command::flush();
549 self.send_cmd(flush_cmd);
550
551 on_done.recv_timeout(timeout).map_err(|err| match err {
552 RecvTimeoutError::Timeout => BatcherFlushError::Timeout,
553 RecvTimeoutError::Disconnected => BatcherFlushError::Closed,
554 })
555 }
556
557 fn update_config(&self, config: ChunkBatcherConfig) {
558 self.send_cmd(Command::UpdateConfig(config));
559 }
560
561 fn send_cmd(&self, cmd: Command) {
562 self.tx_cmds.send(cmd).ok();
565 }
566}
567
568#[expect(clippy::needless_pass_by_value)]
569fn batching_thread(
570 mut config: ChunkBatcherConfig,
571 hooks: BatcherHooks,
572 rx_cmd: Receiver<Command>,
573 tx_chunk: Sender<Chunk>,
574) {
575 let mut rx_tick = crossbeam::channel::tick(config.flush_tick);
576
577 struct Accumulator {
578 latest: Instant,
579 entity_path: EntityPath,
580 pending_rows: Vec<PendingRow>,
581 pending_num_bytes: u64,
582 }
583
584 impl Accumulator {
585 fn new(entity_path: EntityPath) -> Self {
586 Self {
587 entity_path,
588 latest: Instant::now(),
589 pending_rows: Default::default(),
590 pending_num_bytes: Default::default(),
591 }
592 }
593
594 fn reset(&mut self) {
595 self.latest = Instant::now();
596 self.pending_rows.clear();
597 self.pending_num_bytes = 0;
598 }
599 }
600
601 let mut accs: IntMap<EntityPath, Accumulator> = IntMap::default();
602
603 fn do_push_row(acc: &mut Accumulator, row: PendingRow) {
604 acc.pending_num_bytes += row.total_size_bytes();
605 acc.pending_rows.push(row);
606 }
607
608 fn do_flush_all(
609 acc: &mut Accumulator,
610 tx_chunk: &Sender<Chunk>,
611 reason: &str,
612 chunk_max_rows_if_unsorted: u64,
613 ) {
614 let rows = std::mem::take(&mut acc.pending_rows);
615 if rows.is_empty() {
616 return;
617 }
618
619 re_log::trace!(
620 "Flushing {} rows and {} bytes. Reason: {reason}",
621 rows.len(),
622 re_format::format_bytes(acc.pending_num_bytes as _)
623 );
624
625 let chunks =
626 PendingRow::many_into_chunks(acc.entity_path.clone(), chunk_max_rows_if_unsorted, rows);
627 for chunk in chunks {
628 let chunk = match chunk {
629 Ok(chunk) => chunk,
630 Err(err) => {
631 re_log::error!(%err, "corrupt chunk detected, dropping");
632 continue;
633 }
634 };
635
636 if !chunk.components.is_empty() {
640 tx_chunk.send(chunk).ok();
642 } else {
643 re_log::warn_once!(
644 "Dropping chunk without components. Entity path: {}",
645 chunk.entity_path()
646 );
647 }
648 }
649
650 acc.reset();
651 }
652
653 re_log::trace!(
654 "Flushing every: {:.2}s, {} rows, {}",
655 config.flush_tick.as_secs_f64(),
656 config.flush_num_rows,
657 re_format::format_bytes(config.flush_num_bytes as _),
658 );
659 if let Some(on_config_change) = hooks.on_config_change.as_ref() {
661 on_config_change(&config);
662 }
663
664 let mut skip_next_tick = false;
667
668 use crossbeam::select;
669 loop {
670 select! {
671 recv(rx_cmd) -> cmd => {
672 let Ok(cmd) = cmd else {
673 break;
676 };
677
678
679 match cmd {
680 Command::AppendChunk(chunk) => {
681 if !chunk.components.is_empty() {
685 tx_chunk.send(chunk).ok();
687 } else {
688 re_log::warn_once!(
689 "Dropping chunk without components. Entity path: {}",
690 chunk.entity_path()
691 );
692 }
693 },
694 Command::AppendRow(entity_path, row) => {
695 let acc = accs.entry(entity_path.clone())
696 .or_insert_with(|| Accumulator::new(entity_path));
697 do_push_row(acc, row);
698
699 if let Some(config) = hooks.on_insert.as_ref() {
700 config(&acc.pending_rows);
701 }
702
703 if acc.pending_rows.len() as u64 >= config.flush_num_rows {
704 do_flush_all(acc, &tx_chunk, "rows", config.chunk_max_rows_if_unsorted);
705 skip_next_tick = true;
706 } else if acc.pending_num_bytes >= config.flush_num_bytes {
707 do_flush_all(acc, &tx_chunk, "bytes", config.chunk_max_rows_if_unsorted);
708 skip_next_tick = true;
709 }
710 },
711
712 Command::Flush{ on_done } => {
713 skip_next_tick = true;
714 for acc in accs.values_mut() {
715 do_flush_all(acc, &tx_chunk, "manual", config.chunk_max_rows_if_unsorted);
716 }
717 on_done.send(()).ok();
718 },
719
720 Command::UpdateConfig(new_config) => {
721 if config.max_commands_in_flight != new_config.max_commands_in_flight ||
723 config.max_chunks_in_flight != new_config.max_chunks_in_flight {
724 re_log::warn!("Cannot change max commands/chunks in flight after batcher has been created. Previous max commands/chunks: {:?}/{:?}, new max commands/chunks: {:?}/{:?}",
725 config.max_commands_in_flight, config.max_chunks_in_flight, new_config.max_commands_in_flight, new_config.max_chunks_in_flight);
726 }
727
728 re_log::trace!("Updated batcher config: {:?}", new_config);
729 if let Some(on_config_change) = hooks.on_config_change.as_ref() {
730 on_config_change(&new_config);
731 }
732
733 config = new_config;
734 rx_tick = crossbeam::channel::tick(config.flush_tick);
735 }
736
737 Command::Shutdown => break,
738 }
739 },
740
741 recv(rx_tick) -> _ => {
742 if skip_next_tick {
743 skip_next_tick = false;
744 } else {
745 for acc in accs.values_mut() {
747 do_flush_all(acc, &tx_chunk, "tick", config.chunk_max_rows_if_unsorted);
748 }
749 }
750 },
751 };
752 }
753
754 drop(rx_cmd);
755 for acc in accs.values_mut() {
756 do_flush_all(
757 acc,
758 &tx_chunk,
759 "shutdown",
760 config.chunk_max_rows_if_unsorted,
761 );
762 }
763 drop(tx_chunk);
764
765 }
769
770#[derive(Debug, Clone)]
776pub struct PendingRow {
777 pub row_id: RowId,
780
781 pub timepoint: TimePoint,
783
784 pub components: IntMap<ComponentDescriptor, ArrayRef>,
788}
789
790impl PendingRow {
791 #[inline]
792 pub fn new(timepoint: TimePoint, components: IntMap<ComponentDescriptor, ArrayRef>) -> Self {
793 Self {
794 row_id: RowId::new(),
795 timepoint,
796 components,
797 }
798 }
799}
800
801impl re_byte_size::SizeBytes for PendingRow {
802 #[inline]
803 fn heap_size_bytes(&self) -> u64 {
804 let Self {
805 row_id,
806 timepoint,
807 components,
808 } = self;
809
810 row_id.heap_size_bytes() + timepoint.heap_size_bytes() + components.heap_size_bytes()
811 }
812}
813
814impl PendingRow {
815 pub fn into_chunk(self, entity_path: EntityPath) -> ChunkResult<Chunk> {
822 let Self {
823 row_id,
824 timepoint,
825 components,
826 } = self;
827
828 let timelines = timepoint
829 .into_iter()
830 .map(|(timeline_name, cell)| {
831 let times = ArrowScalarBuffer::from(vec![cell.as_i64()]);
832 let time_column =
833 TimeColumn::new(Some(true), Timeline::new(timeline_name, cell.typ()), times);
834 (timeline_name, time_column)
835 })
836 .collect();
837
838 let mut per_desc = ChunkComponents::default();
839 for (component_desc, array) in components {
840 let list_array = arrays_to_list_array_opt(&[Some(&*array as _)]);
841 if let Some(list_array) = list_array {
842 per_desc.insert(component_desc, list_array);
843 }
844 }
845
846 Chunk::from_native_row_ids(
847 ChunkId::new(),
848 entity_path,
849 Some(true),
850 &[row_id],
851 timelines,
852 per_desc,
853 )
854 }
855
856 pub fn many_into_chunks(
870 entity_path: EntityPath,
871 chunk_max_rows_if_unsorted: u64,
872 mut rows: Vec<Self>,
873 ) -> impl Iterator<Item = ChunkResult<Chunk>> {
874 re_tracing::profile_function!();
875
876 {
879 re_tracing::profile_scope!("sort rows");
880 rows.sort_by_key(|row| row.row_id);
881 }
882
883 let mut per_timeline_set: IntMap<u64 , Vec<Self>> = Default::default();
885 {
886 re_tracing::profile_scope!("compute timeline sets");
887
888 for row in rows {
891 let mut hasher = ahash::AHasher::default();
892 row.timepoint.timeline_names().for_each(|timeline| {
893 <TimelineName as std::hash::Hash>::hash(timeline, &mut hasher);
894 });
895
896 per_timeline_set
897 .entry(hasher.finish())
898 .or_default()
899 .push(row);
900 }
901 }
902
903 per_timeline_set.into_values().flat_map(move |rows| {
904 re_tracing::profile_scope!("iterate per timeline set");
905
906 let mut per_datatype_set: IntMap<u64 , Vec<Self>> =
908 Default::default();
909 {
910 re_tracing::profile_scope!("compute datatype sets");
911
912 for row in rows {
925 let mut hasher = ahash::AHasher::default();
926 row.components
927 .values()
928 .for_each(|array| array.data_type().hash(&mut hasher));
929 per_datatype_set
930 .entry(hasher.finish())
931 .or_default()
932 .push(row);
933 }
934 }
935
936 let entity_path = entity_path.clone();
938 per_datatype_set.into_values().flat_map(move |rows| {
939 re_tracing::profile_scope!("iterate per datatype set");
940
941 let mut row_ids: Vec<RowId> = Vec::with_capacity(rows.len());
942 let mut timelines: IntMap<TimelineName, PendingTimeColumn> = IntMap::default();
943
944 let mut all_components: IntMap<ComponentDescriptor, Vec<Option<&dyn ArrowArray>>> =
947 IntMap::default();
948 for row in &rows {
949 for component_desc in row.components.keys() {
950 all_components.entry(component_desc.clone()).or_default();
951 }
952 }
953
954 let mut chunks = Vec::new();
955
956 let mut components = all_components.clone();
957 for row in &rows {
958 let Self {
959 row_id,
960 timepoint: row_timepoint,
961 components: row_components,
962 } = row;
963
964 for (&timeline_name, cell) in row_timepoint {
968 let time_column = timelines.entry(timeline_name).or_insert_with(|| {
969 PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
970 });
971
972 if !row_ids.is_empty() && row_ids.len() as u64 >= chunk_max_rows_if_unsorted
974 && !time_column.is_sorted
975 {
976 chunks.push(Chunk::from_native_row_ids(
977 ChunkId::new(),
978 entity_path.clone(),
979 Some(true),
980 &std::mem::take(&mut row_ids),
981 std::mem::take(&mut timelines)
982 .into_iter()
983 .map(|(name, time_column)| (name, time_column.finish()))
984 .collect(),
985 {
986 let mut per_desc = ChunkComponents::default();
987 for (component_desc, arrays) in std::mem::take(&mut components)
988 {
989 let list_array = arrays_to_list_array_opt(&arrays);
990 if let Some(list_array) = list_array {
991 per_desc.insert(component_desc, list_array);
992 }
993 }
994 per_desc
995 },
996 ));
997
998 components = all_components.clone();
999 }
1000 }
1001
1002 row_ids.push(*row_id);
1003
1004 for (&timeline_name, &cell) in row_timepoint {
1005 let time_column = timelines.entry(timeline_name).or_insert_with(|| {
1006 PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
1007 });
1008 time_column.push(cell.into());
1009 }
1010
1011 for (component_desc, arrays) in &mut components {
1012 arrays.push(
1015 row_components
1016 .get(component_desc)
1017 .map(|array| &**array as &dyn ArrowArray),
1018 );
1019 }
1020 }
1021
1022 chunks.push(Chunk::from_native_row_ids(
1023 ChunkId::new(),
1024 entity_path.clone(),
1025 Some(true),
1026 &std::mem::take(&mut row_ids),
1027 timelines
1028 .into_iter()
1029 .map(|(timeline, time_column)| (timeline, time_column.finish()))
1030 .collect(),
1031 {
1032 let mut per_desc = ChunkComponents::default();
1033 for (component_desc, arrays) in components {
1034 let list_array = arrays_to_list_array_opt(&arrays);
1035 if let Some(list_array) = list_array {
1036 per_desc.insert(component_desc, list_array);
1037 }
1038 }
1039 per_desc
1040 },
1041 ));
1042
1043 chunks
1044 })
1045 })
1046 }
1047}
1048
1049struct PendingTimeColumn {
1053 timeline: Timeline,
1054 times: Vec<i64>,
1055 is_sorted: bool,
1056 time_range: AbsoluteTimeRange,
1057}
1058
1059impl PendingTimeColumn {
1060 fn new(timeline: Timeline) -> Self {
1061 Self {
1062 timeline,
1063 times: Default::default(),
1064 is_sorted: true,
1065 time_range: AbsoluteTimeRange::EMPTY,
1066 }
1067 }
1068
1069 fn push(&mut self, time: TimeInt) {
1071 let Self {
1072 timeline: _,
1073 times,
1074 is_sorted,
1075 time_range,
1076 } = self;
1077
1078 *is_sorted &= times.last().copied().unwrap_or(TimeInt::MIN.as_i64()) <= time.as_i64();
1079 time_range.set_min(TimeInt::min(time_range.min(), time));
1080 time_range.set_max(TimeInt::max(time_range.max(), time));
1081 times.push(time.as_i64());
1082 }
1083
1084 fn finish(self) -> TimeColumn {
1085 let Self {
1086 timeline,
1087 times,
1088 is_sorted,
1089 time_range,
1090 } = self;
1091
1092 TimeColumn {
1093 timeline,
1094 times: ArrowScalarBuffer::from(times),
1095 is_sorted,
1096 time_range,
1097 }
1098 }
1099}
1100
1101#[cfg(test)]
1108mod tests {
1109 use crossbeam::channel::TryRecvError;
1110
1111 use re_log_types::example_components::{MyIndex, MyLabel, MyPoint, MyPoint64, MyPoints};
1112 use re_types_core::Loggable as _;
1113
1114 use super::*;
1115
1116 #[test]
1118 fn simple() -> anyhow::Result<()> {
1119 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1120
1121 let timeline1 = Timeline::new_duration("log_time");
1122
1123 let timepoint1 = TimePoint::default().with(timeline1, 42);
1124 let timepoint2 = TimePoint::default().with(timeline1, 43);
1125 let timepoint3 = TimePoint::default().with(timeline1, 44);
1126
1127 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1128 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1129 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1130
1131 let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1132 let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1133 let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1134
1135 let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1136 let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1137 let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1138
1139 let components1 = [
1140 (MyPoints::descriptor_points(), points1.clone()),
1141 (MyPoints::descriptor_labels(), labels1.clone()),
1142 (MyIndex::partial_descriptor(), indices1.clone()),
1143 ];
1144 let components2 = [
1145 (MyPoints::descriptor_points(), points2.clone()),
1146 (MyPoints::descriptor_labels(), labels2.clone()),
1147 (MyIndex::partial_descriptor(), indices2.clone()),
1148 ];
1149 let components3 = [
1150 (MyPoints::descriptor_points(), points3.clone()),
1151 (MyPoints::descriptor_labels(), labels3.clone()),
1152 (MyIndex::partial_descriptor(), indices3.clone()),
1153 ];
1154
1155 let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1156 let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1157 let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1158
1159 let entity_path1: EntityPath = "a/b/c".into();
1160 batcher.push_row(entity_path1.clone(), row1.clone());
1161 batcher.push_row(entity_path1.clone(), row2.clone());
1162 batcher.push_row(entity_path1.clone(), row3.clone());
1163
1164 let chunks_rx = batcher.chunks();
1165 drop(batcher); let mut chunks = Vec::new();
1168 loop {
1169 let chunk = match chunks_rx.try_recv() {
1170 Ok(chunk) => chunk,
1171 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1172 Err(TryRecvError::Disconnected) => break,
1173 };
1174 chunks.push(chunk);
1175 }
1176
1177 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1178
1179 eprintln!("Chunks:");
1181 for chunk in &chunks {
1182 eprintln!("{chunk}");
1183 }
1184
1185 assert_eq!(1, chunks.len());
1186
1187 {
1188 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1189 let expected_timelines = [(
1190 *timeline1.name(),
1191 TimeColumn::new(Some(true), timeline1, vec![42, 43, 44].into()),
1192 )];
1193 let expected_components = [
1194 (
1195 MyPoints::descriptor_points(),
1196 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1197 ), (
1199 MyPoints::descriptor_labels(),
1200 arrays_to_list_array_opt(&[&*labels1, &*labels2, &*labels3].map(Some)).unwrap(),
1201 ), (
1203 MyIndex::partial_descriptor(),
1204 arrays_to_list_array_opt(&[&*indices1, &*indices2, &*indices3].map(Some))
1205 .unwrap(),
1206 ), ];
1208 let expected_chunk = Chunk::from_native_row_ids(
1209 chunks[0].id,
1210 entity_path1.clone(),
1211 None,
1212 &expected_row_ids,
1213 expected_timelines.into_iter().collect(),
1214 expected_components.into_iter().collect(),
1215 )?;
1216
1217 eprintln!("Expected:\n{expected_chunk}");
1218 eprintln!("Got:\n{}", chunks[0]);
1219 assert_eq!(expected_chunk, chunks[0]);
1220 }
1221
1222 Ok(())
1223 }
1224
1225 #[test]
1226 #[expect(clippy::len_zero)]
1227 fn simple_but_hashes_might_not_match() -> anyhow::Result<()> {
1228 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1229
1230 let timeline1 = Timeline::new_duration("log_time");
1231
1232 let timepoint1 = TimePoint::default().with(timeline1, 42);
1233 let timepoint2 = TimePoint::default().with(timeline1, 43);
1234 let timepoint3 = TimePoint::default().with(timeline1, 44);
1235
1236 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1237 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1238 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1239
1240 let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1241 let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1242 let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1243
1244 let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1245 let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1246 let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1247
1248 let components1 = [
1249 (MyIndex::partial_descriptor(), indices1.clone()),
1250 (MyPoints::descriptor_points(), points1.clone()),
1251 (MyPoints::descriptor_labels(), labels1.clone()),
1252 ];
1253 let components2 = [
1254 (MyPoints::descriptor_points(), points2.clone()),
1255 (MyPoints::descriptor_labels(), labels2.clone()),
1256 (MyIndex::partial_descriptor(), indices2.clone()),
1257 ];
1258 let components3 = [
1259 (MyPoints::descriptor_labels(), labels3.clone()),
1260 (MyIndex::partial_descriptor(), indices3.clone()),
1261 (MyPoints::descriptor_points(), points3.clone()),
1262 ];
1263
1264 let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1265 let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1266 let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1267
1268 let entity_path1: EntityPath = "a/b/c".into();
1269 batcher.push_row(entity_path1.clone(), row1.clone());
1270 batcher.push_row(entity_path1.clone(), row2.clone());
1271 batcher.push_row(entity_path1.clone(), row3.clone());
1272
1273 let chunks_rx = batcher.chunks();
1274 drop(batcher); let mut chunks = Vec::new();
1277 loop {
1278 let chunk = match chunks_rx.try_recv() {
1279 Ok(chunk) => chunk,
1280 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1281 Err(TryRecvError::Disconnected) => break,
1282 };
1283 chunks.push(chunk);
1284 }
1285
1286 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1287
1288 eprintln!("Chunks:");
1290 for chunk in &chunks {
1291 eprintln!("{chunk}");
1292 }
1293
1294 assert!(chunks.len() >= 1);
1302
1303 Ok(())
1304 }
1305
1306 #[test]
1307 #[expect(clippy::zero_sized_map_values)]
1308 fn intmap_order_is_deterministic() {
1309 let descriptors = [
1310 MyPoints::descriptor_points(),
1311 MyPoints::descriptor_colors(),
1312 MyPoints::descriptor_labels(),
1313 MyPoint64::partial_descriptor(),
1314 MyIndex::partial_descriptor(),
1315 ];
1316
1317 let expected: IntMap<ComponentDescriptor, ()> =
1318 descriptors.iter().cloned().map(|d| (d, ())).collect();
1319 let expected: Vec<_> = expected.into_keys().collect();
1320
1321 for _ in 0..1_000 {
1322 let got_collect: IntMap<ComponentDescriptor, ()> =
1323 descriptors.clone().into_iter().map(|d| (d, ())).collect();
1324 let got_collect: Vec<_> = got_collect.into_keys().collect();
1325
1326 let mut got_insert: IntMap<ComponentDescriptor, ()> = Default::default();
1327 for d in descriptors.clone() {
1328 got_insert.insert(d, ());
1329 }
1330 let got_insert: Vec<_> = got_insert.into_keys().collect();
1331
1332 assert_eq!(expected, got_collect);
1333 assert_eq!(expected, got_insert);
1334 }
1335 }
1336
1337 #[test]
1339 fn simple_static() -> anyhow::Result<()> {
1340 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1341
1342 let static_ = TimePoint::default();
1343
1344 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1345 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1346 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1347
1348 let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1349 let components2 = [(MyPoints::descriptor_points(), points2.clone())];
1350 let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1351
1352 let row1 = PendingRow::new(static_.clone(), components1.into_iter().collect());
1353 let row2 = PendingRow::new(static_.clone(), components2.into_iter().collect());
1354 let row3 = PendingRow::new(static_.clone(), components3.into_iter().collect());
1355
1356 let entity_path1: EntityPath = "a/b/c".into();
1357 batcher.push_row(entity_path1.clone(), row1.clone());
1358 batcher.push_row(entity_path1.clone(), row2.clone());
1359 batcher.push_row(entity_path1.clone(), row3.clone());
1360
1361 let chunks_rx = batcher.chunks();
1362 drop(batcher); let mut chunks = Vec::new();
1365 loop {
1366 let chunk = match chunks_rx.try_recv() {
1367 Ok(chunk) => chunk,
1368 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1369 Err(TryRecvError::Disconnected) => break,
1370 };
1371 chunks.push(chunk);
1372 }
1373
1374 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1375
1376 eprintln!("Chunks:");
1378 for chunk in &chunks {
1379 eprintln!("{chunk}");
1380 }
1381
1382 assert_eq!(1, chunks.len());
1383
1384 {
1385 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1386 let expected_timelines = [];
1387 let expected_components = [(
1388 MyPoints::descriptor_points(),
1389 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1390 )];
1391 let expected_chunk = Chunk::from_native_row_ids(
1392 chunks[0].id,
1393 entity_path1.clone(),
1394 None,
1395 &expected_row_ids,
1396 expected_timelines.into_iter().collect(),
1397 expected_components.into_iter().collect(),
1398 )?;
1399
1400 eprintln!("Expected:\n{expected_chunk}");
1401 eprintln!("Got:\n{}", chunks[0]);
1402 assert_eq!(expected_chunk, chunks[0]);
1403 }
1404
1405 Ok(())
1406 }
1407
1408 #[test]
1410 fn different_entities() -> anyhow::Result<()> {
1411 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1412
1413 let timeline1 = Timeline::new_duration("log_time");
1414
1415 let timepoint1 = TimePoint::default().with(timeline1, 42);
1416 let timepoint2 = TimePoint::default().with(timeline1, 43);
1417 let timepoint3 = TimePoint::default().with(timeline1, 44);
1418
1419 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1420 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1421 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1422
1423 let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1424 let components2 = [(MyPoints::descriptor_points(), points2.clone())];
1425 let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1426
1427 let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1428 let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1429 let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1430
1431 let entity_path1: EntityPath = "ent1".into();
1432 let entity_path2: EntityPath = "ent2".into();
1433 batcher.push_row(entity_path1.clone(), row1.clone());
1434 batcher.push_row(entity_path2.clone(), row2.clone());
1435 batcher.push_row(entity_path1.clone(), row3.clone());
1436
1437 let chunks_rx = batcher.chunks();
1438 drop(batcher); let mut chunks = Vec::new();
1441 loop {
1442 let chunk = match chunks_rx.try_recv() {
1443 Ok(chunk) => chunk,
1444 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1445 Err(TryRecvError::Disconnected) => break,
1446 };
1447 chunks.push(chunk);
1448 }
1449
1450 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1451
1452 eprintln!("Chunks:");
1454 for chunk in &chunks {
1455 eprintln!("{chunk}");
1456 }
1457
1458 assert_eq!(2, chunks.len());
1459
1460 {
1461 let expected_row_ids = vec![row1.row_id, row3.row_id];
1462 let expected_timelines = [(
1463 *timeline1.name(),
1464 TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1465 )];
1466 let expected_components = [(
1467 MyPoints::descriptor_points(),
1468 arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1469 )];
1470 let expected_chunk = Chunk::from_native_row_ids(
1471 chunks[0].id,
1472 entity_path1.clone(),
1473 None,
1474 &expected_row_ids,
1475 expected_timelines.into_iter().collect(),
1476 expected_components.into_iter().collect(),
1477 )?;
1478
1479 eprintln!("Expected:\n{expected_chunk}");
1480 eprintln!("Got:\n{}", chunks[0]);
1481 assert_eq!(expected_chunk, chunks[0]);
1482 }
1483
1484 {
1485 let expected_row_ids = vec![row2.row_id];
1486 let expected_timelines = [(
1487 *timeline1.name(),
1488 TimeColumn::new(Some(true), timeline1, vec![43].into()),
1489 )];
1490 let expected_components = [(
1491 MyPoints::descriptor_points(),
1492 arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1493 )];
1494 let expected_chunk = Chunk::from_native_row_ids(
1495 chunks[1].id,
1496 entity_path2.clone(),
1497 None,
1498 &expected_row_ids,
1499 expected_timelines.into_iter().collect(),
1500 expected_components.into_iter().collect(),
1501 )?;
1502
1503 eprintln!("Expected:\n{expected_chunk}");
1504 eprintln!("Got:\n{}", chunks[1]);
1505 assert_eq!(expected_chunk, chunks[1]);
1506 }
1507
1508 Ok(())
1509 }
1510
1511 #[test]
1513 fn different_timelines() -> anyhow::Result<()> {
1514 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1515
1516 let timeline1 = Timeline::new_duration("log_time");
1517 let timeline2 = Timeline::new_sequence("frame_nr");
1518
1519 let timepoint1 = TimePoint::default().with(timeline1, 42);
1520 let timepoint2 = TimePoint::default()
1521 .with(timeline1, 43)
1522 .with(timeline2, 1000);
1523 let timepoint3 = TimePoint::default()
1524 .with(timeline1, 44)
1525 .with(timeline2, 1001);
1526
1527 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1528 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1529 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1530
1531 let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1532 let components2 = [(MyPoints::descriptor_points(), points2.clone())];
1533 let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1534
1535 let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1536 let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1537 let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1538
1539 let entity_path1: EntityPath = "a/b/c".into();
1540 batcher.push_row(entity_path1.clone(), row1.clone());
1541 batcher.push_row(entity_path1.clone(), row2.clone());
1542 batcher.push_row(entity_path1.clone(), row3.clone());
1543
1544 let chunks_rx = batcher.chunks();
1545 drop(batcher); let mut chunks = Vec::new();
1548 loop {
1549 let chunk = match chunks_rx.try_recv() {
1550 Ok(chunk) => chunk,
1551 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1552 Err(TryRecvError::Disconnected) => break,
1553 };
1554 chunks.push(chunk);
1555 }
1556
1557 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1558
1559 eprintln!("Chunks:");
1561 for chunk in &chunks {
1562 eprintln!("{chunk}");
1563 }
1564
1565 assert_eq!(2, chunks.len());
1566
1567 {
1568 let expected_row_ids = vec![row1.row_id];
1569 let expected_timelines = [(
1570 *timeline1.name(),
1571 TimeColumn::new(Some(true), timeline1, vec![42].into()),
1572 )];
1573 let expected_components = [(
1574 MyPoints::descriptor_points(),
1575 arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(),
1576 )];
1577 let expected_chunk = Chunk::from_native_row_ids(
1578 chunks[0].id,
1579 entity_path1.clone(),
1580 None,
1581 &expected_row_ids,
1582 expected_timelines.into_iter().collect(),
1583 expected_components.into_iter().collect(),
1584 )?;
1585
1586 eprintln!("Expected:\n{expected_chunk}");
1587 eprintln!("Got:\n{}", chunks[0]);
1588 assert_eq!(expected_chunk, chunks[0]);
1589 }
1590
1591 {
1592 let expected_row_ids = vec![row2.row_id, row3.row_id];
1593 let expected_timelines = [
1594 (
1595 *timeline1.name(),
1596 TimeColumn::new(Some(true), timeline1, vec![43, 44].into()),
1597 ),
1598 (
1599 *timeline2.name(),
1600 TimeColumn::new(Some(true), timeline2, vec![1000, 1001].into()),
1601 ),
1602 ];
1603 let expected_components = [(
1604 MyPoints::descriptor_points(),
1605 arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(),
1606 )];
1607 let expected_chunk = Chunk::from_native_row_ids(
1608 chunks[1].id,
1609 entity_path1.clone(),
1610 None,
1611 &expected_row_ids,
1612 expected_timelines.into_iter().collect(),
1613 expected_components.into_iter().collect(),
1614 )?;
1615
1616 eprintln!("Expected:\n{expected_chunk}");
1617 eprintln!("Got:\n{}", chunks[1]);
1618 assert_eq!(expected_chunk, chunks[1]);
1619 }
1620
1621 Ok(())
1622 }
1623
1624 #[test]
1626 fn different_datatypes() -> anyhow::Result<()> {
1627 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1628
1629 let timeline1 = Timeline::new_duration("log_time");
1630
1631 let timepoint1 = TimePoint::default().with(timeline1, 42);
1632 let timepoint2 = TimePoint::default().with(timeline1, 43);
1633 let timepoint3 = TimePoint::default().with(timeline1, 44);
1634
1635 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1636 let points2 =
1637 MyPoint64::to_arrow([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?;
1638 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1639
1640 let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1641 let components2 = [(MyPoints::descriptor_points(), points2.clone())]; let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1643
1644 let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1645 let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1646 let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1647
1648 let entity_path1: EntityPath = "a/b/c".into();
1649 batcher.push_row(entity_path1.clone(), row1.clone());
1650 batcher.push_row(entity_path1.clone(), row2.clone());
1651 batcher.push_row(entity_path1.clone(), row3.clone());
1652
1653 let chunks_rx = batcher.chunks();
1654 drop(batcher); let mut chunks = Vec::new();
1657 loop {
1658 let chunk = match chunks_rx.try_recv() {
1659 Ok(chunk) => chunk,
1660 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1661 Err(TryRecvError::Disconnected) => break,
1662 };
1663 chunks.push(chunk);
1664 }
1665
1666 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1667
1668 eprintln!("Chunks:");
1670 for chunk in &chunks {
1671 eprintln!("{chunk}");
1672 }
1673
1674 assert_eq!(2, chunks.len());
1675
1676 {
1677 let expected_row_ids = vec![row1.row_id, row3.row_id];
1678 let expected_timelines = [(
1679 *timeline1.name(),
1680 TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1681 )];
1682 let expected_components = [(
1683 MyPoints::descriptor_points(),
1684 arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1685 )];
1686 let expected_chunk = Chunk::from_native_row_ids(
1687 chunks[0].id,
1688 entity_path1.clone(),
1689 None,
1690 &expected_row_ids,
1691 expected_timelines.into_iter().collect(),
1692 expected_components.into_iter().collect(),
1693 )?;
1694
1695 eprintln!("Expected:\n{expected_chunk}");
1696 eprintln!("Got:\n{}", chunks[0]);
1697 assert_eq!(expected_chunk, chunks[0]);
1698 }
1699
1700 {
1701 let expected_row_ids = vec![row2.row_id];
1702 let expected_timelines = [(
1703 *timeline1.name(),
1704 TimeColumn::new(Some(true), timeline1, vec![43].into()),
1705 )];
1706 let expected_components = [(
1707 MyPoints::descriptor_points(),
1708 arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1709 )];
1710 let expected_chunk = Chunk::from_native_row_ids(
1711 chunks[1].id,
1712 entity_path1.clone(),
1713 None,
1714 &expected_row_ids,
1715 expected_timelines.into_iter().collect(),
1716 expected_components.into_iter().collect(),
1717 )?;
1718
1719 eprintln!("Expected:\n{expected_chunk}");
1720 eprintln!("Got:\n{}", chunks[1]);
1721 assert_eq!(expected_chunk, chunks[1]);
1722 }
1723
1724 Ok(())
1725 }
1726
1727 #[test]
1730 fn unsorted_timeline_below_threshold() -> anyhow::Result<()> {
1731 let batcher = ChunkBatcher::new(
1732 ChunkBatcherConfig {
1733 chunk_max_rows_if_unsorted: 1000,
1734 ..ChunkBatcherConfig::NEVER
1735 },
1736 BatcherHooks::NONE,
1737 )?;
1738
1739 let timeline1 = Timeline::new_duration("log_time");
1740 let timeline2 = Timeline::new_duration("frame_nr");
1741
1742 let timepoint1 = TimePoint::default()
1743 .with(timeline2, 1000)
1744 .with(timeline1, 42);
1745 let timepoint2 = TimePoint::default()
1746 .with(timeline2, 1001)
1747 .with(timeline1, 43);
1748 let timepoint3 = TimePoint::default()
1749 .with(timeline2, 1002)
1750 .with(timeline1, 44);
1751 let timepoint4 = TimePoint::default()
1752 .with(timeline2, 1003)
1753 .with(timeline1, 45);
1754
1755 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1756 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1757 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1758 let points4 =
1759 MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1760
1761 let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1762 let components2 = [(MyPoints::descriptor_points(), points2.clone())];
1763 let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1764 let components4 = [(MyPoints::descriptor_points(), points4.clone())];
1765
1766 let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect());
1767 let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect());
1768 let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect());
1769 let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect());
1770
1771 let entity_path1: EntityPath = "a/b/c".into();
1772 batcher.push_row(entity_path1.clone(), row1.clone());
1773 batcher.push_row(entity_path1.clone(), row2.clone());
1774 batcher.push_row(entity_path1.clone(), row3.clone());
1775 batcher.push_row(entity_path1.clone(), row4.clone());
1776
1777 let chunks_rx = batcher.chunks();
1778 drop(batcher); let mut chunks = Vec::new();
1781 loop {
1782 let chunk = match chunks_rx.try_recv() {
1783 Ok(chunk) => chunk,
1784 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1785 Err(TryRecvError::Disconnected) => break,
1786 };
1787 chunks.push(chunk);
1788 }
1789
1790 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1791
1792 eprintln!("Chunks:");
1794 for chunk in &chunks {
1795 eprintln!("{chunk}");
1796 }
1797
1798 assert_eq!(1, chunks.len());
1799
1800 {
1801 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id, row4.row_id];
1802 let expected_timelines = [
1803 (
1804 *timeline1.name(),
1805 TimeColumn::new(Some(false), timeline1, vec![45, 42, 43, 44].into()),
1806 ),
1807 (
1808 *timeline2.name(),
1809 TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001, 1002].into()),
1810 ),
1811 ];
1812 let expected_components = [(
1813 MyPoints::descriptor_points(),
1814 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3, &*points4].map(Some))
1815 .unwrap(),
1816 )];
1817 let expected_chunk = Chunk::from_native_row_ids(
1818 chunks[0].id,
1819 entity_path1.clone(),
1820 None,
1821 &expected_row_ids,
1822 expected_timelines.into_iter().collect(),
1823 expected_components.into_iter().collect(),
1824 )?;
1825
1826 eprintln!("Expected:\n{expected_chunk}");
1827 eprintln!("Got:\n{}", chunks[0]);
1828 assert_eq!(expected_chunk, chunks[0]);
1829 }
1830
1831 Ok(())
1832 }
1833
1834 #[test]
1837 fn unsorted_timeline_above_threshold() -> anyhow::Result<()> {
1838 let batcher = ChunkBatcher::new(
1839 ChunkBatcherConfig {
1840 chunk_max_rows_if_unsorted: 3,
1841 ..ChunkBatcherConfig::NEVER
1842 },
1843 BatcherHooks::NONE,
1844 )?;
1845
1846 let timeline1 = Timeline::new_duration("log_time");
1847 let timeline2 = Timeline::new_duration("frame_nr");
1848
1849 let timepoint1 = TimePoint::default()
1850 .with(timeline2, 1000)
1851 .with(timeline1, 42);
1852 let timepoint2 = TimePoint::default()
1853 .with(timeline2, 1001)
1854 .with(timeline1, 43);
1855 let timepoint3 = TimePoint::default()
1856 .with(timeline2, 1002)
1857 .with(timeline1, 44);
1858 let timepoint4 = TimePoint::default()
1859 .with(timeline2, 1003)
1860 .with(timeline1, 45);
1861
1862 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1863 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1864 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1865 let points4 =
1866 MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1867
1868 let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1869 let components2 = [(MyPoints::descriptor_points(), points2.clone())];
1870 let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1871 let components4 = [(MyPoints::descriptor_points(), points4.clone())];
1872
1873 let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect());
1874 let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect());
1875 let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect());
1876 let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect());
1877
1878 let entity_path1: EntityPath = "a/b/c".into();
1879 batcher.push_row(entity_path1.clone(), row1.clone());
1880 batcher.push_row(entity_path1.clone(), row2.clone());
1881 batcher.push_row(entity_path1.clone(), row3.clone());
1882 batcher.push_row(entity_path1.clone(), row4.clone());
1883
1884 let chunks_rx = batcher.chunks();
1885 drop(batcher); let mut chunks = Vec::new();
1888 loop {
1889 let chunk = match chunks_rx.try_recv() {
1890 Ok(chunk) => chunk,
1891 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1892 Err(TryRecvError::Disconnected) => break,
1893 };
1894 chunks.push(chunk);
1895 }
1896
1897 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1898
1899 eprintln!("Chunks:");
1901 for chunk in &chunks {
1902 eprintln!("{chunk}");
1903 }
1904
1905 assert_eq!(2, chunks.len());
1906
1907 {
1908 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1909 let expected_timelines = [
1910 (
1911 *timeline1.name(),
1912 TimeColumn::new(Some(false), timeline1, vec![45, 42, 43].into()),
1913 ),
1914 (
1915 *timeline2.name(),
1916 TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001].into()),
1917 ),
1918 ];
1919 let expected_components = [(
1920 MyPoints::descriptor_points(),
1921 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1922 )];
1923 let expected_chunk = Chunk::from_native_row_ids(
1924 chunks[0].id,
1925 entity_path1.clone(),
1926 None,
1927 &expected_row_ids,
1928 expected_timelines.into_iter().collect(),
1929 expected_components.into_iter().collect(),
1930 )?;
1931
1932 eprintln!("Expected:\n{expected_chunk}");
1933 eprintln!("Got:\n{}", chunks[0]);
1934 assert_eq!(expected_chunk, chunks[0]);
1935 }
1936
1937 {
1938 let expected_row_ids = vec![row4.row_id];
1939 let expected_timelines = [
1940 (
1941 *timeline1.name(),
1942 TimeColumn::new(Some(true), timeline1, vec![44].into()),
1943 ),
1944 (
1945 *timeline2.name(),
1946 TimeColumn::new(Some(true), timeline2, vec![1002].into()),
1947 ),
1948 ];
1949 let expected_components = [(
1950 MyPoints::descriptor_points(),
1951 arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(),
1952 )];
1953 let expected_chunk = Chunk::from_native_row_ids(
1954 chunks[1].id,
1955 entity_path1.clone(),
1956 None,
1957 &expected_row_ids,
1958 expected_timelines.into_iter().collect(),
1959 expected_components.into_iter().collect(),
1960 )?;
1961
1962 eprintln!("Expected:\n{expected_chunk}");
1963 eprintln!("Got:\n{}", chunks[1]);
1964 assert_eq!(expected_chunk, chunks[1]);
1965 }
1966
1967 Ok(())
1968 }
1969}