1use std::hash::{Hash as _, Hasher as _};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
6use nohash_hasher::IntMap;
7use re_arrow_util::arrays_to_list_array_opt;
8use re_byte_size::SizeBytes as _;
9use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, TimePoint, Timeline, TimelineName};
10use re_quota_channel::{Receiver, Sender};
11use re_types_core::{ComponentIdentifier, SerializedComponentBatch, SerializedComponentColumn};
12
13use crate::chunk::ChunkComponents;
14use crate::{Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
15
16#[derive(Debug, thiserror::Error)]
20pub enum BatcherFlushError {
21 #[error("Batcher stopped before flushing completed")]
22 Closed,
23
24 #[error("Batcher flush timed out - not all messages were sent.")]
25 Timeout,
26}
27
28#[derive(thiserror::Error, Debug)]
30pub enum ChunkBatcherError {
31 #[error("Failed to parse config: '{name}={value}': {err}")]
33 ParseConfig {
34 name: &'static str,
35 value: String,
36 err: Box<dyn std::error::Error + Send + Sync>,
37 },
38
39 #[error("Failed to spawn background thread '{name}': {err}")]
41 SpawnThread {
42 name: &'static str,
43 err: Box<dyn std::error::Error + Send + Sync>,
44 },
45}
46
47pub type ChunkBatcherResult<T> = Result<T, ChunkBatcherError>;
48
49#[derive(Clone, Default)]
51pub struct BatcherHooks {
52 #[expect(clippy::type_complexity)]
59 pub on_insert: Option<Arc<dyn Fn(&[PendingRow]) + Send + Sync>>,
60
61 #[expect(clippy::type_complexity)]
66 pub on_config_change: Option<Arc<dyn Fn(&ChunkBatcherConfig) + Send + Sync>>,
67
68 pub on_release: Option<re_log_types::ArrowRecordBatchReleaseCallback>,
74}
75
76impl BatcherHooks {
77 pub const NONE: Self = Self {
78 on_insert: None,
79 on_config_change: None,
80 on_release: None,
81 };
82}
83
84impl PartialEq for BatcherHooks {
85 fn eq(&self, other: &Self) -> bool {
86 let Self {
87 on_insert,
88 on_config_change,
89 on_release,
90 } = self;
91
92 let on_insert_eq = match (on_insert, &other.on_insert) {
93 (Some(a), Some(b)) => Arc::ptr_eq(a, b),
94 (None, None) => true,
95 _ => false,
96 };
97
98 let on_config_change_eq = match (on_config_change, &other.on_config_change) {
99 (Some(a), Some(b)) => Arc::ptr_eq(a, b),
100 (None, None) => true,
101 _ => false,
102 };
103
104 on_insert_eq && on_config_change_eq && on_release == &other.on_release
105 }
106}
107
108impl std::fmt::Debug for BatcherHooks {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 let Self {
111 on_insert,
112 on_config_change,
113 on_release,
114 } = self;
115 f.debug_struct("BatcherHooks")
116 .field("on_insert", &on_insert.as_ref().map(|_| "…"))
117 .field("on_config_change", &on_config_change.as_ref().map(|_| "…"))
118 .field("on_release", &on_release)
119 .finish()
120 }
121}
122
123#[derive(Clone, Copy, Debug, PartialEq, Eq)]
129pub struct ChunkBatcherConfig {
130 pub flush_tick: Duration,
137
138 pub flush_num_bytes: u64,
142
143 pub flush_num_rows: u64,
145
146 pub chunk_max_rows_if_unsorted: u64,
149
150 pub max_bytes_in_flight: u64,
157}
158
159impl Default for ChunkBatcherConfig {
160 fn default() -> Self {
161 Self::DEFAULT
162 }
163}
164
165impl ChunkBatcherConfig {
166 pub const DEFAULT: Self = Self {
168 flush_tick: Duration::from_millis(200),
169 flush_num_bytes: 1024 * 1024, flush_num_rows: u64::MAX,
171 chunk_max_rows_if_unsorted: 256,
172 max_bytes_in_flight: 100 * 1024 * 1024, };
174
175 pub const LOW_LATENCY: Self = Self {
177 flush_tick: Duration::from_millis(8), ..Self::DEFAULT
179 };
180
181 pub const ALWAYS: Self = Self {
183 flush_tick: Duration::MAX,
184 flush_num_bytes: 0,
185 flush_num_rows: 0,
186 chunk_max_rows_if_unsorted: 256,
187 ..Self::DEFAULT
188 };
189
190 pub const NEVER: Self = Self {
192 flush_tick: Duration::MAX,
193 flush_num_bytes: u64::MAX,
194 flush_num_rows: u64::MAX,
195 chunk_max_rows_if_unsorted: 256,
196 ..Self::DEFAULT
197 };
198
199 pub const ENV_FLUSH_TICK: &'static str = "RERUN_FLUSH_TICK_SECS";
201
202 pub const ENV_FLUSH_NUM_BYTES: &'static str = "RERUN_FLUSH_NUM_BYTES";
204
205 pub const ENV_FLUSH_NUM_ROWS: &'static str = "RERUN_FLUSH_NUM_ROWS";
207
208 pub const ENV_CHUNK_MAX_ROWS_IF_UNSORTED: &'static str = "RERUN_CHUNK_MAX_ROWS_IF_UNSORTED";
212
213 #[deprecated(note = "use `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` instead")]
215 const ENV_MAX_CHUNK_ROWS_IF_UNSORTED: &'static str = "RERUN_MAX_CHUNK_ROWS_IF_UNSORTED";
216
217 #[inline]
222 pub fn from_env() -> ChunkBatcherResult<Self> {
223 Self::default().apply_env()
224 }
225
226 pub fn apply_env(&self) -> ChunkBatcherResult<Self> {
231 let mut new = *self;
232
233 if let Ok(s) = std::env::var(Self::ENV_FLUSH_TICK) {
234 let flush_duration_secs: f64 =
235 s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
236 name: Self::ENV_FLUSH_TICK,
237 value: s.clone(),
238 err: Box::new(err),
239 })?;
240
241 new.flush_tick = Duration::try_from_secs_f64(flush_duration_secs).map_err(|err| {
242 ChunkBatcherError::ParseConfig {
243 name: Self::ENV_FLUSH_TICK,
244 value: s.clone(),
245 err: Box::new(err),
246 }
247 })?;
248 }
249
250 if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) {
251 if let Some(num_bytes) = re_format::parse_bytes(&s) {
252 new.flush_num_bytes = num_bytes.unsigned_abs();
254 } else {
255 new.flush_num_bytes = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
257 name: Self::ENV_FLUSH_NUM_BYTES,
258 value: s.clone(),
259 err: Box::new(err),
260 })?;
261 }
262 }
263
264 if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_ROWS) {
265 new.flush_num_rows = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
266 name: Self::ENV_FLUSH_NUM_ROWS,
267 value: s.clone(),
268 err: Box::new(err),
269 })?;
270 }
271
272 if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED) {
273 new.chunk_max_rows_if_unsorted =
274 s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
275 name: Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED,
276 value: s.clone(),
277 err: Box::new(err),
278 })?;
279 }
280
281 #[expect(deprecated)]
283 if let Ok(s) = std::env::var(Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED) {
284 new.chunk_max_rows_if_unsorted =
285 s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
286 name: Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED,
287 value: s.clone(),
288 err: Box::new(err),
289 })?;
290 }
291
292 Ok(new)
293 }
294}
295
296#[test]
297fn chunk_batcher_config() {
298 #![expect(unsafe_code)] unsafe {
303 std::env::set_var("RERUN_FLUSH_TICK_SECS", "0.3");
304 std::env::set_var("RERUN_FLUSH_NUM_BYTES", "42");
305 std::env::set_var("RERUN_FLUSH_NUM_ROWS", "666");
306 std::env::set_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED", "7777");
307 }
308
309 let config = ChunkBatcherConfig::from_env().unwrap();
310 let expected = ChunkBatcherConfig {
311 flush_tick: Duration::from_millis(300),
312 flush_num_bytes: 42,
313 flush_num_rows: 666,
314 chunk_max_rows_if_unsorted: 7777,
315 ..Default::default()
316 };
317 assert_eq!(expected, config);
318
319 unsafe {
321 std::env::set_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED", "9999");
322 }
323
324 let config = ChunkBatcherConfig::from_env().unwrap();
325 let expected = ChunkBatcherConfig {
326 flush_tick: Duration::from_millis(300),
327 flush_num_bytes: 42,
328 flush_num_rows: 666,
329 chunk_max_rows_if_unsorted: 9999,
330 ..Default::default()
331 };
332 assert_eq!(expected, config);
333}
334
335#[derive(Clone)]
373pub struct ChunkBatcher {
374 inner: Arc<ChunkBatcherInner>,
375}
376
377struct ChunkBatcherInner {
380 tx_cmds: Sender<Command>,
384 rx_chunks: Option<Receiver<Chunk>>,
386 cmds_to_chunks_handle: Option<std::thread::JoinHandle<()>>,
387}
388
389impl Drop for ChunkBatcherInner {
390 fn drop(&mut self) {
391 if let Some(rx_chunks) = self.rx_chunks.take()
394 && !rx_chunks.is_empty()
395 {
396 re_log::warn!("Dropping data");
397 }
398
399 self.tx_cmds.send(Command::Shutdown).ok();
402 if let Some(handle) = self.cmds_to_chunks_handle.take() {
403 handle.join().ok();
404 }
405 }
406}
407
408enum Command {
409 AppendChunk(Chunk),
410 AppendRow(EntityPath, PendingRow),
411 Flush {
412 on_done: crossbeam::channel::Sender<()>,
413 },
414 UpdateConfig(ChunkBatcherConfig),
415 Shutdown,
416}
417
418impl re_byte_size::SizeBytes for Command {
419 fn heap_size_bytes(&self) -> u64 {
420 match self {
421 Self::AppendChunk(chunk) => chunk.heap_size_bytes(),
422 Self::AppendRow(_, row) => row.heap_size_bytes(),
423 Self::Flush { .. } | Self::UpdateConfig(_) | Self::Shutdown => 0,
424 }
425 }
426}
427
428impl Command {
429 fn flush() -> (Self, crossbeam::channel::Receiver<()>) {
430 let (tx, rx) = crossbeam::channel::bounded(1); (Self::Flush { on_done: tx }, rx)
432 }
433}
434
435impl ChunkBatcher {
436 #[must_use = "Batching threads will automatically shutdown when this object is dropped"]
441 pub fn new(config: ChunkBatcherConfig, hooks: BatcherHooks) -> ChunkBatcherResult<Self> {
442 let (tx_cmds, rx_cmd) =
443 re_quota_channel::channel("batcher_input", config.max_bytes_in_flight / 2);
444 let (tx_chunk, rx_chunks) =
445 re_quota_channel::channel("batcher_output", config.max_bytes_in_flight / 2);
446
447 let cmds_to_chunks_handle = {
448 const NAME: &str = "ChunkBatcher::cmds_to_chunks";
449 std::thread::Builder::new()
450 .name(NAME.into())
451 .spawn(move || batching_thread(config, hooks, rx_cmd, tx_chunk))
452 .map_err(|err| ChunkBatcherError::SpawnThread {
453 name: NAME,
454 err: Box::new(err),
455 })?
456 };
457
458 re_log::debug!(?config, "creating new chunk batcher");
459
460 let inner = ChunkBatcherInner {
461 tx_cmds,
462 rx_chunks: Some(rx_chunks),
463 cmds_to_chunks_handle: Some(cmds_to_chunks_handle),
464 };
465
466 Ok(Self {
467 inner: Arc::new(inner),
468 })
469 }
470
471 pub fn push_chunk(&self, chunk: Chunk) {
474 self.inner.push_chunk(chunk);
475 }
476
477 #[inline]
483 pub fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
484 self.inner.push_row(entity_path, row);
485 }
486
487 #[inline]
492 pub fn flush_async(&self) {
493 self.inner.flush_async();
494 }
495
496 #[inline]
500 pub fn flush_blocking(&self, timeout: Duration) -> Result<(), BatcherFlushError> {
501 self.inner.flush_blocking(timeout)
502 }
503
504 pub fn update_config(&self, config: ChunkBatcherConfig) {
506 self.inner.update_config(config);
507 }
508
509 pub fn chunks(&self) -> Receiver<Chunk> {
517 #[expect(clippy::unwrap_used)]
520 self.inner.rx_chunks.clone().unwrap()
521 }
522}
523
524impl ChunkBatcherInner {
525 fn push_chunk(&self, chunk: Chunk) {
526 self.send_cmd(Command::AppendChunk(chunk));
527 }
528
529 fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
530 self.send_cmd(Command::AppendRow(entity_path, row));
531 }
532
533 fn flush_async(&self) {
534 let (flush_cmd, _) = Command::flush();
535 self.send_cmd(flush_cmd);
536 }
537
538 fn flush_blocking(&self, timeout: Duration) -> Result<(), BatcherFlushError> {
539 use crossbeam::channel::RecvTimeoutError;
540
541 let (flush_cmd, on_done) = Command::flush();
542 self.send_cmd(flush_cmd);
543
544 on_done.recv_timeout(timeout).map_err(|err| match err {
545 RecvTimeoutError::Timeout => BatcherFlushError::Timeout,
546 RecvTimeoutError::Disconnected => BatcherFlushError::Closed,
547 })
548 }
549
550 fn update_config(&self, config: ChunkBatcherConfig) {
551 self.send_cmd(Command::UpdateConfig(config));
552 }
553
554 fn send_cmd(&self, cmd: Command) {
555 self.tx_cmds.send(cmd).ok();
558 }
559}
560
561#[expect(clippy::needless_pass_by_value)]
562fn batching_thread(
563 mut config: ChunkBatcherConfig,
564 hooks: BatcherHooks,
565 rx_cmd: Receiver<Command>,
566 tx_chunk: Sender<Chunk>,
567) {
568 let mut rx_tick = crossbeam::channel::tick(config.flush_tick);
569
570 struct Accumulator {
571 latest: Instant,
572 entity_path: EntityPath,
573 pending_rows: Vec<PendingRow>,
574 pending_num_bytes: u64,
575 }
576
577 impl Accumulator {
578 fn new(entity_path: EntityPath) -> Self {
579 Self {
580 entity_path,
581 latest: Instant::now(),
582 pending_rows: Default::default(),
583 pending_num_bytes: Default::default(),
584 }
585 }
586
587 fn reset(&mut self) {
588 self.latest = Instant::now();
589 self.pending_rows.clear();
590 self.pending_num_bytes = 0;
591 }
592 }
593
594 let mut accs: IntMap<EntityPath, Accumulator> = IntMap::default();
595
596 fn do_push_row(acc: &mut Accumulator, row: PendingRow) {
597 acc.pending_num_bytes += row.total_size_bytes();
598 acc.pending_rows.push(row);
599 }
600
601 fn do_flush_all(
602 acc: &mut Accumulator,
603 tx_chunk: &Sender<Chunk>,
604 reason: &str,
605 chunk_max_rows_if_unsorted: u64,
606 ) {
607 let rows = std::mem::take(&mut acc.pending_rows);
608 if rows.is_empty() {
609 return;
610 }
611
612 re_log::trace!(
613 "Flushing {} rows and {} bytes. Reason: {reason}",
614 rows.len(),
615 re_format::format_bytes(acc.pending_num_bytes as _)
616 );
617
618 let chunks =
619 PendingRow::many_into_chunks(acc.entity_path.clone(), chunk_max_rows_if_unsorted, rows);
620 for chunk in chunks {
621 let chunk = match chunk {
622 Ok(chunk) => chunk,
623 Err(err) => {
624 re_log::error!(%err, "corrupt chunk detected, dropping");
625 continue;
626 }
627 };
628
629 if !chunk.components.is_empty() {
633 tx_chunk.send(chunk).ok();
635 } else {
636 re_log::warn_once!(
637 "Dropping chunk without components. Entity path: {}",
638 chunk.entity_path()
639 );
640 }
641 }
642
643 acc.reset();
644 }
645
646 re_log::trace!(
647 "Flushing every: {:.2}s, {} rows, {}",
648 config.flush_tick.as_secs_f64(),
649 config.flush_num_rows,
650 re_format::format_bytes(config.flush_num_bytes as _),
651 );
652 if let Some(on_config_change) = hooks.on_config_change.as_ref() {
654 on_config_change(&config);
655 }
656
657 let mut skip_next_tick = false;
660
661 loop {
662 crossbeam::select! {
663 recv(rx_cmd.inner()) -> cmd => {
664 let Ok(cmd) = cmd else {
665 break;
668 };
669
670 let re_quota_channel::SizedMessage { msg, size_bytes } = cmd;
671
672 rx_cmd.manual_on_receive(size_bytes);
673
674 match msg {
675 Command::AppendChunk(chunk) => {
676 if !chunk.components.is_empty() {
680 tx_chunk.send(chunk).ok();
682 } else {
683 re_log::warn_once!(
684 "Dropping chunk without components. Entity path: {}",
685 chunk.entity_path()
686 );
687 }
688 },
689 Command::AppendRow(entity_path, row) => {
690 let acc = accs.entry(entity_path.clone())
691 .or_insert_with(|| Accumulator::new(entity_path));
692 do_push_row(acc, row);
693
694 if let Some(config) = hooks.on_insert.as_ref() {
695 config(&acc.pending_rows);
696 }
697
698 if acc.pending_rows.len() as u64 >= config.flush_num_rows {
699 do_flush_all(acc, &tx_chunk, "rows", config.chunk_max_rows_if_unsorted);
700 skip_next_tick = true;
701 } else if acc.pending_num_bytes >= config.flush_num_bytes {
702 do_flush_all(acc, &tx_chunk, "bytes", config.chunk_max_rows_if_unsorted);
703 skip_next_tick = true;
704 }
705 },
706
707 Command::Flush{ on_done } => {
708 skip_next_tick = true;
709 for acc in accs.values_mut() {
710 do_flush_all(acc, &tx_chunk, "manual", config.chunk_max_rows_if_unsorted);
711 }
712 on_done.send(()).ok();
713 },
714
715 Command::UpdateConfig(new_config) => {
716 if config.max_bytes_in_flight != new_config.max_bytes_in_flight {
718 re_log::warn!("Cannot change max_bytes_in_flight after batcher has been created. \
719 Previous max_bytes_in_flight: {:?}, new max_bytes_in_flight: {:?}",
720 config.max_bytes_in_flight, new_config.max_bytes_in_flight);
721 }
722
723 re_log::trace!("Updated batcher config: {:?}", new_config);
724 if let Some(on_config_change) = hooks.on_config_change.as_ref() {
725 on_config_change(&new_config);
726 }
727
728 config = new_config;
729 rx_tick = crossbeam::channel::tick(config.flush_tick);
730 }
731
732 Command::Shutdown => break,
733 }
734 },
735
736 recv(rx_tick) -> _ => {
737 if skip_next_tick {
738 skip_next_tick = false;
739 } else {
740 for acc in accs.values_mut() {
742 do_flush_all(acc, &tx_chunk, "tick", config.chunk_max_rows_if_unsorted);
743 }
744 }
745 },
746 };
747 }
748
749 drop(rx_cmd);
750 for acc in accs.values_mut() {
751 do_flush_all(
752 acc,
753 &tx_chunk,
754 "shutdown",
755 config.chunk_max_rows_if_unsorted,
756 );
757 }
758 drop(tx_chunk);
759
760 }
764
765#[derive(Debug, Clone)]
771pub struct PendingRow {
772 pub row_id: RowId,
775
776 pub timepoint: TimePoint,
778
779 pub components: IntMap<ComponentIdentifier, SerializedComponentBatch>,
783}
784
785impl PendingRow {
786 #[inline]
787 pub fn new(
788 timepoint: TimePoint,
789 components: IntMap<ComponentIdentifier, SerializedComponentBatch>,
790 ) -> Self {
791 Self {
792 row_id: RowId::new(),
793 timepoint,
794 components,
795 }
796 }
797
798 #[inline]
799 pub fn from_iter(
800 timepoint: TimePoint,
801 components: impl IntoIterator<Item = SerializedComponentBatch>,
802 ) -> Self {
803 Self::new(
804 timepoint,
805 components
806 .into_iter()
807 .map(|component| (component.descriptor.component, component))
808 .collect(),
809 )
810 }
811}
812
813impl re_byte_size::SizeBytes for PendingRow {
814 #[inline]
815 fn heap_size_bytes(&self) -> u64 {
816 let Self {
817 row_id,
818 timepoint,
819 components,
820 } = self;
821
822 row_id.heap_size_bytes() + timepoint.heap_size_bytes() + components.heap_size_bytes()
823 }
824}
825
826impl PendingRow {
827 pub fn into_chunk(self, entity_path: EntityPath) -> ChunkResult<Chunk> {
834 let Self {
835 row_id,
836 timepoint,
837 components,
838 } = self;
839
840 let timelines = timepoint
841 .into_iter()
842 .map(|(timeline_name, cell)| {
843 let times = ArrowScalarBuffer::from(vec![cell.as_i64()]);
844 let time_column =
845 TimeColumn::new(Some(true), Timeline::new(timeline_name, cell.typ()), times);
846 (timeline_name, time_column)
847 })
848 .collect();
849
850 let mut per_desc = ChunkComponents::default();
851 for (_component, batch) in components {
852 let list_array = arrays_to_list_array_opt(&[Some(&*batch.array as _)]);
853 if let Some(list_array) = list_array {
854 per_desc.insert(SerializedComponentColumn::new(list_array, batch.descriptor));
855 }
856 }
857
858 Chunk::from_native_row_ids(
859 ChunkId::new(),
860 entity_path,
861 Some(true),
862 &[row_id],
863 timelines,
864 per_desc,
865 )
866 }
867
868 pub fn many_into_chunks(
882 entity_path: EntityPath,
883 chunk_max_rows_if_unsorted: u64,
884 mut rows: Vec<Self>,
885 ) -> impl Iterator<Item = ChunkResult<Chunk>> {
886 re_tracing::profile_function!();
887
888 {
891 re_tracing::profile_scope!("sort rows");
892 rows.sort_by_key(|row| row.row_id);
893 }
894
895 let mut per_timeline_set: IntMap<u64 , Vec<Self>> = Default::default();
897 {
898 re_tracing::profile_scope!("compute timeline sets");
899
900 for row in rows {
903 let mut hasher = ahash::AHasher::default();
904 row.timepoint.timeline_names().for_each(|timeline| {
905 <TimelineName as std::hash::Hash>::hash(timeline, &mut hasher);
906 });
907
908 per_timeline_set
909 .entry(hasher.finish())
910 .or_default()
911 .push(row);
912 }
913 }
914
915 per_timeline_set.into_values().flat_map(move |rows| {
916 re_tracing::profile_scope!("iterate per timeline set");
917
918 let mut per_datatype_set: IntMap<u64 , Vec<Self>> =
920 Default::default();
921 {
922 re_tracing::profile_scope!("compute datatype sets");
923
924 for row in rows {
937 let mut hasher = ahash::AHasher::default();
938 row.components
939 .values()
940 .for_each(|batch| batch.array.data_type().hash(&mut hasher));
941 per_datatype_set
942 .entry(hasher.finish())
943 .or_default()
944 .push(row);
945 }
946 }
947
948 let entity_path = entity_path.clone();
950 per_datatype_set.into_values().flat_map(move |rows| {
951 re_tracing::profile_scope!("iterate per datatype set");
952
953 let mut row_ids: Vec<RowId> = Vec::with_capacity(rows.len());
954 let mut timelines: IntMap<TimelineName, PendingTimeColumn> = IntMap::default();
955
956 let mut all_components: IntMap<ComponentIdentifier, _> = IntMap::default();
959 for row in &rows {
960 for (component, batch) in &row.components {
961 all_components
962 .entry(*component)
963 .or_insert_with(|| (batch.descriptor.clone(), Vec::new()));
964 }
965 }
966
967 let mut chunks = Vec::new();
968
969 let mut components = all_components.clone();
970 for row in &rows {
971 let Self {
972 row_id,
973 timepoint: row_timepoint,
974 components: row_components,
975 } = row;
976
977 for (&timeline_name, cell) in row_timepoint {
981 let time_column = timelines.entry(timeline_name).or_insert_with(|| {
982 PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
983 });
984
985 if !row_ids.is_empty() && row_ids.len() as u64 >= chunk_max_rows_if_unsorted
987 && !time_column.is_sorted
988 {
989 chunks.push(Chunk::from_native_row_ids(
990 ChunkId::new(),
991 entity_path.clone(),
992 Some(true),
993 &std::mem::take(&mut row_ids),
994 std::mem::take(&mut timelines)
995 .into_iter()
996 .map(|(name, time_column)| (name, time_column.finish()))
997 .collect(),
998 {
999 let mut per_component = ChunkComponents::default();
1000 for (_component, (desc, arrays)) in
1001 std::mem::take(&mut components)
1002 {
1003 let list_array = arrays_to_list_array_opt(&arrays);
1004 if let Some(list_array) = list_array {
1005 per_component.insert(SerializedComponentColumn::new(
1006 list_array, desc,
1007 ));
1008 }
1009 }
1010 per_component
1011 },
1012 ));
1013
1014 components = all_components.clone();
1015 }
1016 }
1017
1018 row_ids.push(*row_id);
1019
1020 for (&timeline_name, &cell) in row_timepoint {
1021 let time_column = timelines.entry(timeline_name).or_insert_with(|| {
1022 PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
1023 });
1024 time_column.push(cell.into());
1025 }
1026
1027 for (component, (_desc, arrays)) in &mut components {
1028 arrays.push(
1031 row_components
1032 .get(component)
1033 .map(|batch| &*batch.array as _),
1034 );
1035 }
1036 }
1037
1038 chunks.push(Chunk::from_native_row_ids(
1039 ChunkId::new(),
1040 entity_path.clone(),
1041 Some(true),
1042 &std::mem::take(&mut row_ids),
1043 timelines
1044 .into_iter()
1045 .map(|(timeline, time_column)| (timeline, time_column.finish()))
1046 .collect(),
1047 {
1048 let mut per_desc = ChunkComponents::default();
1049 for (_component, (desc, arrays)) in components {
1050 let list_array = arrays_to_list_array_opt(&arrays);
1051 if let Some(list_array) = list_array {
1052 per_desc.insert(SerializedComponentColumn::new(list_array, desc));
1053 }
1054 }
1055 per_desc
1056 },
1057 ));
1058
1059 chunks
1060 })
1061 })
1062 }
1063}
1064
1065struct PendingTimeColumn {
1069 timeline: Timeline,
1070 times: Vec<i64>,
1071 is_sorted: bool,
1072 time_range: AbsoluteTimeRange,
1073}
1074
1075impl PendingTimeColumn {
1076 fn new(timeline: Timeline) -> Self {
1077 Self {
1078 timeline,
1079 times: Default::default(),
1080 is_sorted: true,
1081 time_range: AbsoluteTimeRange::EMPTY,
1082 }
1083 }
1084
1085 fn push(&mut self, time: TimeInt) {
1087 let Self {
1088 timeline: _,
1089 times,
1090 is_sorted,
1091 time_range,
1092 } = self;
1093
1094 *is_sorted &= times.last().copied().unwrap_or(TimeInt::MIN.as_i64()) <= time.as_i64();
1095 time_range.set_min(TimeInt::min(time_range.min(), time));
1096 time_range.set_max(TimeInt::max(time_range.max(), time));
1097 times.push(time.as_i64());
1098 }
1099
1100 fn finish(self) -> TimeColumn {
1101 let Self {
1102 timeline,
1103 times,
1104 is_sorted,
1105 time_range,
1106 } = self;
1107
1108 TimeColumn {
1109 timeline,
1110 times: ArrowScalarBuffer::from(times),
1111 is_sorted,
1112 time_range,
1113 }
1114 }
1115}
1116
1117#[cfg(test)]
1124mod tests {
1125 use crossbeam::channel::TryRecvError;
1126 use re_log_types::example_components::{MyIndex, MyLabel, MyPoint, MyPoint64, MyPoints};
1127 use re_types_core::{ComponentDescriptor, Loggable as _};
1128
1129 use super::*;
1130
1131 #[test]
1133 fn simple() -> anyhow::Result<()> {
1134 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1135
1136 let timeline1 = Timeline::new_duration("log_time");
1137
1138 let timepoint1 = TimePoint::default().with(timeline1, 42);
1139 let timepoint2 = TimePoint::default().with(timeline1, 43);
1140 let timepoint3 = TimePoint::default().with(timeline1, 44);
1141
1142 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1143 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1144 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1145
1146 let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1147 let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1148 let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1149
1150 let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1151 let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1152 let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1153
1154 let components1 = [
1155 SerializedComponentBatch::new(points1.clone(), MyPoints::descriptor_points()),
1156 SerializedComponentBatch::new(labels1.clone(), MyPoints::descriptor_labels()),
1157 SerializedComponentBatch::new(indices1.clone(), MyIndex::partial_descriptor()),
1158 ];
1159 let components2 = [
1160 SerializedComponentBatch::new(points2.clone(), MyPoints::descriptor_points()),
1161 SerializedComponentBatch::new(labels2.clone(), MyPoints::descriptor_labels()),
1162 SerializedComponentBatch::new(indices2.clone(), MyIndex::partial_descriptor()),
1163 ];
1164 let components3 = [
1165 SerializedComponentBatch::new(points3.clone(), MyPoints::descriptor_points()),
1166 SerializedComponentBatch::new(labels3.clone(), MyPoints::descriptor_labels()),
1167 SerializedComponentBatch::new(indices3.clone(), MyIndex::partial_descriptor()),
1168 ];
1169
1170 let row1 = PendingRow::from_iter(timepoint1.clone(), components1);
1171 let row2 = PendingRow::from_iter(timepoint2.clone(), components2);
1172 let row3 = PendingRow::from_iter(timepoint3.clone(), components3);
1173
1174 let entity_path1: EntityPath = "a/b/c".into();
1175 batcher.push_row(entity_path1.clone(), row1.clone());
1176 batcher.push_row(entity_path1.clone(), row2.clone());
1177 batcher.push_row(entity_path1.clone(), row3.clone());
1178
1179 let chunks_rx = batcher.chunks();
1180 drop(batcher); let mut chunks = Vec::new();
1183 loop {
1184 let chunk = match chunks_rx.try_recv() {
1185 Ok(chunk) => chunk,
1186 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1187 Err(TryRecvError::Disconnected) => break,
1188 };
1189 chunks.push(chunk);
1190 }
1191
1192 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1193
1194 eprintln!("Chunks:");
1196 for chunk in &chunks {
1197 eprintln!("{chunk}");
1198 }
1199
1200 assert_eq!(1, chunks.len());
1201
1202 {
1203 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1204 let expected_timelines = [(
1205 *timeline1.name(),
1206 TimeColumn::new(Some(true), timeline1, vec![42, 43, 44].into()),
1207 )];
1208 let expected_components = [
1209 (
1210 MyPoints::descriptor_points(),
1211 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1212 ), (
1214 MyPoints::descriptor_labels(),
1215 arrays_to_list_array_opt(&[&*labels1, &*labels2, &*labels3].map(Some)).unwrap(),
1216 ), (
1218 MyIndex::partial_descriptor(),
1219 arrays_to_list_array_opt(&[&*indices1, &*indices2, &*indices3].map(Some))
1220 .unwrap(),
1221 ), ];
1223 let expected_chunk = Chunk::from_native_row_ids(
1224 chunks[0].id,
1225 entity_path1.clone(),
1226 None,
1227 &expected_row_ids,
1228 expected_timelines.into_iter().collect(),
1229 expected_components.into_iter().collect(),
1230 )?;
1231
1232 eprintln!("Expected:\n{expected_chunk}");
1233 eprintln!("Got:\n{}", chunks[0]);
1234 assert_eq!(expected_chunk, chunks[0]);
1235 }
1236
1237 Ok(())
1238 }
1239
1240 #[test]
1241 #[expect(clippy::len_zero)]
1242 fn simple_but_hashes_might_not_match() -> anyhow::Result<()> {
1243 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1244
1245 let timeline1 = Timeline::new_duration("log_time");
1246
1247 let timepoint1 = TimePoint::default().with(timeline1, 42);
1248 let timepoint2 = TimePoint::default().with(timeline1, 43);
1249 let timepoint3 = TimePoint::default().with(timeline1, 44);
1250
1251 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1252 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1253 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1254
1255 let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1256 let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1257 let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1258
1259 let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1260 let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1261 let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1262
1263 let components1 = [
1264 SerializedComponentBatch::new(indices1.clone(), MyIndex::partial_descriptor()),
1265 SerializedComponentBatch::new(points1.clone(), MyPoints::descriptor_points()),
1266 SerializedComponentBatch::new(labels1.clone(), MyPoints::descriptor_labels()),
1267 ];
1268 let components2 = [
1269 SerializedComponentBatch::new(points2.clone(), MyPoints::descriptor_points()),
1270 SerializedComponentBatch::new(labels2.clone(), MyPoints::descriptor_labels()),
1271 SerializedComponentBatch::new(indices2.clone(), MyIndex::partial_descriptor()),
1272 ];
1273 let components3 = [
1274 SerializedComponentBatch::new(labels3.clone(), MyPoints::descriptor_labels()),
1275 SerializedComponentBatch::new(indices3.clone(), MyIndex::partial_descriptor()),
1276 SerializedComponentBatch::new(points3.clone(), MyPoints::descriptor_points()),
1277 ];
1278
1279 let row1 = PendingRow::from_iter(timepoint1.clone(), components1);
1280 let row2 = PendingRow::from_iter(timepoint2.clone(), components2);
1281 let row3 = PendingRow::from_iter(timepoint3.clone(), components3);
1282
1283 let entity_path1: EntityPath = "a/b/c".into();
1284 batcher.push_row(entity_path1.clone(), row1.clone());
1285 batcher.push_row(entity_path1.clone(), row2.clone());
1286 batcher.push_row(entity_path1.clone(), row3.clone());
1287
1288 let chunks_rx = batcher.chunks();
1289 drop(batcher); let mut chunks = Vec::new();
1292 loop {
1293 let chunk = match chunks_rx.try_recv() {
1294 Ok(chunk) => chunk,
1295 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1296 Err(TryRecvError::Disconnected) => break,
1297 };
1298 chunks.push(chunk);
1299 }
1300
1301 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1302
1303 eprintln!("Chunks:");
1305 for chunk in &chunks {
1306 eprintln!("{chunk}");
1307 }
1308
1309 assert!(chunks.len() >= 1);
1317
1318 Ok(())
1319 }
1320
1321 #[test]
1322 #[expect(clippy::zero_sized_map_values)]
1323 fn intmap_order_is_deterministic() {
1324 let descriptors = [
1325 MyPoints::descriptor_points(),
1326 MyPoints::descriptor_colors(),
1327 MyPoints::descriptor_labels(),
1328 MyPoint64::partial_descriptor(),
1329 MyIndex::partial_descriptor(),
1330 ];
1331
1332 let expected: IntMap<ComponentDescriptor, ()> =
1333 descriptors.iter().cloned().map(|d| (d, ())).collect();
1334 let expected: Vec<_> = expected.into_keys().collect();
1335
1336 for _ in 0..1_000 {
1337 let got_collect: IntMap<ComponentDescriptor, ()> =
1338 descriptors.clone().into_iter().map(|d| (d, ())).collect();
1339 let got_collect: Vec<_> = got_collect.into_keys().collect();
1340
1341 let mut got_insert: IntMap<ComponentDescriptor, ()> = Default::default();
1342 for d in descriptors.clone() {
1343 got_insert.insert(d, ());
1344 }
1345 let got_insert: Vec<_> = got_insert.into_keys().collect();
1346
1347 assert_eq!(expected, got_collect);
1348 assert_eq!(expected, got_insert);
1349 }
1350 }
1351
1352 #[test]
1354 fn simple_static() -> anyhow::Result<()> {
1355 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1356
1357 let static_ = TimePoint::default();
1358
1359 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1360 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1361 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1362
1363 let components1 = [SerializedComponentBatch::new(
1364 points1.clone(),
1365 MyPoints::descriptor_points(),
1366 )];
1367 let components2 = [SerializedComponentBatch::new(
1368 points2.clone(),
1369 MyPoints::descriptor_points(),
1370 )];
1371 let components3 = [SerializedComponentBatch::new(
1372 points3.clone(),
1373 MyPoints::descriptor_points(),
1374 )];
1375
1376 let row1 = PendingRow::from_iter(static_.clone(), components1);
1377 let row2 = PendingRow::from_iter(static_.clone(), components2);
1378 let row3 = PendingRow::from_iter(static_.clone(), components3);
1379
1380 let entity_path1: EntityPath = "a/b/c".into();
1381 batcher.push_row(entity_path1.clone(), row1.clone());
1382 batcher.push_row(entity_path1.clone(), row2.clone());
1383 batcher.push_row(entity_path1.clone(), row3.clone());
1384
1385 let chunks_rx = batcher.chunks();
1386 drop(batcher); let mut chunks = Vec::new();
1389 loop {
1390 let chunk = match chunks_rx.try_recv() {
1391 Ok(chunk) => chunk,
1392 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1393 Err(TryRecvError::Disconnected) => break,
1394 };
1395 chunks.push(chunk);
1396 }
1397
1398 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1399
1400 eprintln!("Chunks:");
1402 for chunk in &chunks {
1403 eprintln!("{chunk}");
1404 }
1405
1406 assert_eq!(1, chunks.len());
1407
1408 {
1409 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1410 let expected_timelines = [];
1411 let expected_components = [(
1412 MyPoints::descriptor_points(),
1413 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1414 )];
1415 let expected_chunk = Chunk::from_native_row_ids(
1416 chunks[0].id,
1417 entity_path1.clone(),
1418 None,
1419 &expected_row_ids,
1420 expected_timelines.into_iter().collect(),
1421 expected_components.into_iter().collect(),
1422 )?;
1423
1424 eprintln!("Expected:\n{expected_chunk}");
1425 eprintln!("Got:\n{}", chunks[0]);
1426 assert_eq!(expected_chunk, chunks[0]);
1427 }
1428
1429 Ok(())
1430 }
1431
1432 #[test]
1434 fn different_entities() -> anyhow::Result<()> {
1435 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1436
1437 let timeline1 = Timeline::new_duration("log_time");
1438
1439 let timepoint1 = TimePoint::default().with(timeline1, 42);
1440 let timepoint2 = TimePoint::default().with(timeline1, 43);
1441 let timepoint3 = TimePoint::default().with(timeline1, 44);
1442
1443 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1444 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1445 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1446
1447 let components1 = [SerializedComponentBatch::new(
1448 points1.clone(),
1449 MyPoints::descriptor_points(),
1450 )];
1451 let components2 = [SerializedComponentBatch::new(
1452 points2.clone(),
1453 MyPoints::descriptor_points(),
1454 )];
1455 let components3 = [SerializedComponentBatch::new(
1456 points3.clone(),
1457 MyPoints::descriptor_points(),
1458 )];
1459
1460 let row1 = PendingRow::from_iter(timepoint1.clone(), components1);
1461 let row2 = PendingRow::from_iter(timepoint2.clone(), components2);
1462 let row3 = PendingRow::from_iter(timepoint3.clone(), components3);
1463
1464 let entity_path1: EntityPath = "ent1".into();
1465 let entity_path2: EntityPath = "ent2".into();
1466 batcher.push_row(entity_path1.clone(), row1.clone());
1467 batcher.push_row(entity_path2.clone(), row2.clone());
1468 batcher.push_row(entity_path1.clone(), row3.clone());
1469
1470 let chunks_rx = batcher.chunks();
1471 drop(batcher); let mut chunks = Vec::new();
1474 loop {
1475 let chunk = match chunks_rx.try_recv() {
1476 Ok(chunk) => chunk,
1477 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1478 Err(TryRecvError::Disconnected) => break,
1479 };
1480 chunks.push(chunk);
1481 }
1482
1483 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1484
1485 eprintln!("Chunks:");
1487 for chunk in &chunks {
1488 eprintln!("{chunk}");
1489 }
1490
1491 assert_eq!(2, chunks.len());
1492
1493 {
1494 let expected_row_ids = vec![row1.row_id, row3.row_id];
1495 let expected_timelines = [(
1496 *timeline1.name(),
1497 TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1498 )];
1499 let expected_components = [(
1500 MyPoints::descriptor_points(),
1501 arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1502 )];
1503 let expected_chunk = Chunk::from_native_row_ids(
1504 chunks[0].id,
1505 entity_path1.clone(),
1506 None,
1507 &expected_row_ids,
1508 expected_timelines.into_iter().collect(),
1509 expected_components.into_iter().collect(),
1510 )?;
1511
1512 eprintln!("Expected:\n{expected_chunk}");
1513 eprintln!("Got:\n{}", chunks[0]);
1514 assert_eq!(expected_chunk, chunks[0]);
1515 }
1516
1517 {
1518 let expected_row_ids = vec![row2.row_id];
1519 let expected_timelines = [(
1520 *timeline1.name(),
1521 TimeColumn::new(Some(true), timeline1, vec![43].into()),
1522 )];
1523 let expected_components = [(
1524 MyPoints::descriptor_points(),
1525 arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1526 )];
1527 let expected_chunk = Chunk::from_native_row_ids(
1528 chunks[1].id,
1529 entity_path2.clone(),
1530 None,
1531 &expected_row_ids,
1532 expected_timelines.into_iter().collect(),
1533 expected_components.into_iter().collect(),
1534 )?;
1535
1536 eprintln!("Expected:\n{expected_chunk}");
1537 eprintln!("Got:\n{}", chunks[1]);
1538 assert_eq!(expected_chunk, chunks[1]);
1539 }
1540
1541 Ok(())
1542 }
1543
1544 #[test]
1546 fn different_timelines() -> anyhow::Result<()> {
1547 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1548
1549 let timeline1 = Timeline::new_duration("log_time");
1550 let timeline2 = Timeline::new_sequence("frame_nr");
1551
1552 let timepoint1 = TimePoint::default().with(timeline1, 42);
1553 let timepoint2 = TimePoint::default()
1554 .with(timeline1, 43)
1555 .with(timeline2, 1000);
1556 let timepoint3 = TimePoint::default()
1557 .with(timeline1, 44)
1558 .with(timeline2, 1001);
1559
1560 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1561 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1562 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1563
1564 let components1 = [SerializedComponentBatch::new(
1565 points1.clone(),
1566 MyPoints::descriptor_points(),
1567 )];
1568 let components2 = [SerializedComponentBatch::new(
1569 points2.clone(),
1570 MyPoints::descriptor_points(),
1571 )];
1572 let components3 = [SerializedComponentBatch::new(
1573 points3.clone(),
1574 MyPoints::descriptor_points(),
1575 )];
1576
1577 let row1 = PendingRow::from_iter(timepoint1.clone(), components1);
1578 let row2 = PendingRow::from_iter(timepoint2.clone(), components2);
1579 let row3 = PendingRow::from_iter(timepoint3.clone(), components3);
1580
1581 let entity_path1: EntityPath = "a/b/c".into();
1582 batcher.push_row(entity_path1.clone(), row1.clone());
1583 batcher.push_row(entity_path1.clone(), row2.clone());
1584 batcher.push_row(entity_path1.clone(), row3.clone());
1585
1586 let chunks_rx = batcher.chunks();
1587 drop(batcher); let mut chunks = Vec::new();
1590 loop {
1591 let chunk = match chunks_rx.try_recv() {
1592 Ok(chunk) => chunk,
1593 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1594 Err(TryRecvError::Disconnected) => break,
1595 };
1596 chunks.push(chunk);
1597 }
1598
1599 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1600
1601 eprintln!("Chunks:");
1603 for chunk in &chunks {
1604 eprintln!("{chunk}");
1605 }
1606
1607 assert_eq!(2, chunks.len());
1608
1609 {
1610 let expected_row_ids = vec![row1.row_id];
1611 let expected_timelines = [(
1612 *timeline1.name(),
1613 TimeColumn::new(Some(true), timeline1, vec![42].into()),
1614 )];
1615 let expected_components = [(
1616 MyPoints::descriptor_points(),
1617 arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(),
1618 )];
1619 let expected_chunk = Chunk::from_native_row_ids(
1620 chunks[0].id,
1621 entity_path1.clone(),
1622 None,
1623 &expected_row_ids,
1624 expected_timelines.into_iter().collect(),
1625 expected_components.into_iter().collect(),
1626 )?;
1627
1628 eprintln!("Expected:\n{expected_chunk}");
1629 eprintln!("Got:\n{}", chunks[0]);
1630 assert_eq!(expected_chunk, chunks[0]);
1631 }
1632
1633 {
1634 let expected_row_ids = vec![row2.row_id, row3.row_id];
1635 let expected_timelines = [
1636 (
1637 *timeline1.name(),
1638 TimeColumn::new(Some(true), timeline1, vec![43, 44].into()),
1639 ),
1640 (
1641 *timeline2.name(),
1642 TimeColumn::new(Some(true), timeline2, vec![1000, 1001].into()),
1643 ),
1644 ];
1645 let expected_components = [(
1646 MyPoints::descriptor_points(),
1647 arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(),
1648 )];
1649 let expected_chunk = Chunk::from_native_row_ids(
1650 chunks[1].id,
1651 entity_path1.clone(),
1652 None,
1653 &expected_row_ids,
1654 expected_timelines.into_iter().collect(),
1655 expected_components.into_iter().collect(),
1656 )?;
1657
1658 eprintln!("Expected:\n{expected_chunk}");
1659 eprintln!("Got:\n{}", chunks[1]);
1660 assert_eq!(expected_chunk, chunks[1]);
1661 }
1662
1663 Ok(())
1664 }
1665
1666 #[test]
1668 fn different_datatypes() -> anyhow::Result<()> {
1669 let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1670
1671 let timeline1 = Timeline::new_duration("log_time");
1672
1673 let timepoint1 = TimePoint::default().with(timeline1, 42);
1674 let timepoint2 = TimePoint::default().with(timeline1, 43);
1675 let timepoint3 = TimePoint::default().with(timeline1, 44);
1676
1677 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1678 let points2 =
1679 MyPoint64::to_arrow([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?;
1680 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1681
1682 let components1 = [SerializedComponentBatch::new(
1683 points1.clone(),
1684 MyPoints::descriptor_points(),
1685 )];
1686 let components2 = [SerializedComponentBatch::new(
1687 points2.clone(),
1688 MyPoints::descriptor_points(),
1689 )]; let components3 = [SerializedComponentBatch::new(
1691 points3.clone(),
1692 MyPoints::descriptor_points(),
1693 )];
1694
1695 let row1 = PendingRow::from_iter(timepoint1.clone(), components1);
1696 let row2 = PendingRow::from_iter(timepoint2.clone(), components2);
1697 let row3 = PendingRow::from_iter(timepoint3.clone(), components3);
1698
1699 let entity_path1: EntityPath = "a/b/c".into();
1700 batcher.push_row(entity_path1.clone(), row1.clone());
1701 batcher.push_row(entity_path1.clone(), row2.clone());
1702 batcher.push_row(entity_path1.clone(), row3.clone());
1703
1704 let chunks_rx = batcher.chunks();
1705 drop(batcher); let mut chunks = Vec::new();
1708 loop {
1709 let chunk = match chunks_rx.try_recv() {
1710 Ok(chunk) => chunk,
1711 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1712 Err(TryRecvError::Disconnected) => break,
1713 };
1714 chunks.push(chunk);
1715 }
1716
1717 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1718
1719 eprintln!("Chunks:");
1721 for chunk in &chunks {
1722 eprintln!("{chunk}");
1723 }
1724
1725 assert_eq!(2, chunks.len());
1726
1727 {
1728 let expected_row_ids = vec![row1.row_id, row3.row_id];
1729 let expected_timelines = [(
1730 *timeline1.name(),
1731 TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1732 )];
1733 let expected_components = [(
1734 MyPoints::descriptor_points(),
1735 arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1736 )];
1737 let expected_chunk = Chunk::from_native_row_ids(
1738 chunks[0].id,
1739 entity_path1.clone(),
1740 None,
1741 &expected_row_ids,
1742 expected_timelines.into_iter().collect(),
1743 expected_components.into_iter().collect(),
1744 )?;
1745
1746 eprintln!("Expected:\n{expected_chunk}");
1747 eprintln!("Got:\n{}", chunks[0]);
1748 assert_eq!(expected_chunk, chunks[0]);
1749 }
1750
1751 {
1752 let expected_row_ids = vec![row2.row_id];
1753 let expected_timelines = [(
1754 *timeline1.name(),
1755 TimeColumn::new(Some(true), timeline1, vec![43].into()),
1756 )];
1757 let expected_components = [(
1758 MyPoints::descriptor_points(),
1759 arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1760 )];
1761 let expected_chunk = Chunk::from_native_row_ids(
1762 chunks[1].id,
1763 entity_path1.clone(),
1764 None,
1765 &expected_row_ids,
1766 expected_timelines.into_iter().collect(),
1767 expected_components.into_iter().collect(),
1768 )?;
1769
1770 eprintln!("Expected:\n{expected_chunk}");
1771 eprintln!("Got:\n{}", chunks[1]);
1772 assert_eq!(expected_chunk, chunks[1]);
1773 }
1774
1775 Ok(())
1776 }
1777
1778 #[test]
1781 fn unsorted_timeline_below_threshold() -> anyhow::Result<()> {
1782 let batcher = ChunkBatcher::new(
1783 ChunkBatcherConfig {
1784 chunk_max_rows_if_unsorted: 1000,
1785 ..ChunkBatcherConfig::NEVER
1786 },
1787 BatcherHooks::NONE,
1788 )?;
1789
1790 let timeline1 = Timeline::new_duration("log_time");
1791 let timeline2 = Timeline::new_duration("frame_nr");
1792
1793 let timepoint1 = TimePoint::default()
1794 .with(timeline2, 1000)
1795 .with(timeline1, 42);
1796 let timepoint2 = TimePoint::default()
1797 .with(timeline2, 1001)
1798 .with(timeline1, 43);
1799 let timepoint3 = TimePoint::default()
1800 .with(timeline2, 1002)
1801 .with(timeline1, 44);
1802 let timepoint4 = TimePoint::default()
1803 .with(timeline2, 1003)
1804 .with(timeline1, 45);
1805
1806 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1807 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1808 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1809 let points4 =
1810 MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1811
1812 let components1 = [SerializedComponentBatch::new(
1813 points1.clone(),
1814 MyPoints::descriptor_points(),
1815 )];
1816 let components2 = [SerializedComponentBatch::new(
1817 points2.clone(),
1818 MyPoints::descriptor_points(),
1819 )];
1820 let components3 = [SerializedComponentBatch::new(
1821 points3.clone(),
1822 MyPoints::descriptor_points(),
1823 )];
1824 let components4 = [SerializedComponentBatch::new(
1825 points4.clone(),
1826 MyPoints::descriptor_points(),
1827 )];
1828
1829 let row1 = PendingRow::from_iter(timepoint4.clone(), components1);
1830 let row2 = PendingRow::from_iter(timepoint1.clone(), components2);
1831 let row3 = PendingRow::from_iter(timepoint2.clone(), components3);
1832 let row4 = PendingRow::from_iter(timepoint3.clone(), components4);
1833
1834 let entity_path1: EntityPath = "a/b/c".into();
1835 batcher.push_row(entity_path1.clone(), row1.clone());
1836 batcher.push_row(entity_path1.clone(), row2.clone());
1837 batcher.push_row(entity_path1.clone(), row3.clone());
1838 batcher.push_row(entity_path1.clone(), row4.clone());
1839
1840 let chunks_rx = batcher.chunks();
1841 drop(batcher); let mut chunks = Vec::new();
1844 loop {
1845 let chunk = match chunks_rx.try_recv() {
1846 Ok(chunk) => chunk,
1847 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1848 Err(TryRecvError::Disconnected) => break,
1849 };
1850 chunks.push(chunk);
1851 }
1852
1853 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1854
1855 eprintln!("Chunks:");
1857 for chunk in &chunks {
1858 eprintln!("{chunk}");
1859 }
1860
1861 assert_eq!(1, chunks.len());
1862
1863 {
1864 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id, row4.row_id];
1865 let expected_timelines = [
1866 (
1867 *timeline1.name(),
1868 TimeColumn::new(Some(false), timeline1, vec![45, 42, 43, 44].into()),
1869 ),
1870 (
1871 *timeline2.name(),
1872 TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001, 1002].into()),
1873 ),
1874 ];
1875 let expected_components = [(
1876 MyPoints::descriptor_points(),
1877 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3, &*points4].map(Some))
1878 .unwrap(),
1879 )];
1880 let expected_chunk = Chunk::from_native_row_ids(
1881 chunks[0].id,
1882 entity_path1.clone(),
1883 None,
1884 &expected_row_ids,
1885 expected_timelines.into_iter().collect(),
1886 expected_components.into_iter().collect(),
1887 )?;
1888
1889 eprintln!("Expected:\n{expected_chunk}");
1890 eprintln!("Got:\n{}", chunks[0]);
1891 assert_eq!(expected_chunk, chunks[0]);
1892 }
1893
1894 Ok(())
1895 }
1896
1897 #[test]
1900 fn unsorted_timeline_above_threshold() -> anyhow::Result<()> {
1901 let batcher = ChunkBatcher::new(
1902 ChunkBatcherConfig {
1903 chunk_max_rows_if_unsorted: 3,
1904 ..ChunkBatcherConfig::NEVER
1905 },
1906 BatcherHooks::NONE,
1907 )?;
1908
1909 let timeline1 = Timeline::new_duration("log_time");
1910 let timeline2 = Timeline::new_duration("frame_nr");
1911
1912 let timepoint1 = TimePoint::default()
1913 .with(timeline2, 1000)
1914 .with(timeline1, 42);
1915 let timepoint2 = TimePoint::default()
1916 .with(timeline2, 1001)
1917 .with(timeline1, 43);
1918 let timepoint3 = TimePoint::default()
1919 .with(timeline2, 1002)
1920 .with(timeline1, 44);
1921 let timepoint4 = TimePoint::default()
1922 .with(timeline2, 1003)
1923 .with(timeline1, 45);
1924
1925 let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1926 let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1927 let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1928 let points4 =
1929 MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1930
1931 let components1 = [SerializedComponentBatch::new(
1932 points1.clone(),
1933 MyPoints::descriptor_points(),
1934 )];
1935 let components2 = [SerializedComponentBatch::new(
1936 points2.clone(),
1937 MyPoints::descriptor_points(),
1938 )];
1939 let components3 = [SerializedComponentBatch::new(
1940 points3.clone(),
1941 MyPoints::descriptor_points(),
1942 )];
1943 let components4 = [SerializedComponentBatch::new(
1944 points4.clone(),
1945 MyPoints::descriptor_points(),
1946 )];
1947
1948 let row1 = PendingRow::from_iter(timepoint4.clone(), components1);
1949 let row2 = PendingRow::from_iter(timepoint1.clone(), components2);
1950 let row3 = PendingRow::from_iter(timepoint2.clone(), components3);
1951 let row4 = PendingRow::from_iter(timepoint3.clone(), components4);
1952
1953 let entity_path1: EntityPath = "a/b/c".into();
1954 batcher.push_row(entity_path1.clone(), row1.clone());
1955 batcher.push_row(entity_path1.clone(), row2.clone());
1956 batcher.push_row(entity_path1.clone(), row3.clone());
1957 batcher.push_row(entity_path1.clone(), row4.clone());
1958
1959 let chunks_rx = batcher.chunks();
1960 drop(batcher); let mut chunks = Vec::new();
1963 loop {
1964 let chunk = match chunks_rx.try_recv() {
1965 Ok(chunk) => chunk,
1966 Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1967 Err(TryRecvError::Disconnected) => break,
1968 };
1969 chunks.push(chunk);
1970 }
1971
1972 chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1973
1974 eprintln!("Chunks:");
1976 for chunk in &chunks {
1977 eprintln!("{chunk}");
1978 }
1979
1980 assert_eq!(2, chunks.len());
1981
1982 {
1983 let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1984 let expected_timelines = [
1985 (
1986 *timeline1.name(),
1987 TimeColumn::new(Some(false), timeline1, vec![45, 42, 43].into()),
1988 ),
1989 (
1990 *timeline2.name(),
1991 TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001].into()),
1992 ),
1993 ];
1994 let expected_components = [(
1995 MyPoints::descriptor_points(),
1996 arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1997 )];
1998 let expected_chunk = Chunk::from_native_row_ids(
1999 chunks[0].id,
2000 entity_path1.clone(),
2001 None,
2002 &expected_row_ids,
2003 expected_timelines.into_iter().collect(),
2004 expected_components.into_iter().collect(),
2005 )?;
2006
2007 eprintln!("Expected:\n{expected_chunk}");
2008 eprintln!("Got:\n{}", chunks[0]);
2009 assert_eq!(expected_chunk, chunks[0]);
2010 }
2011
2012 {
2013 let expected_row_ids = vec![row4.row_id];
2014 let expected_timelines = [
2015 (
2016 *timeline1.name(),
2017 TimeColumn::new(Some(true), timeline1, vec![44].into()),
2018 ),
2019 (
2020 *timeline2.name(),
2021 TimeColumn::new(Some(true), timeline2, vec![1002].into()),
2022 ),
2023 ];
2024 let expected_components = [(
2025 MyPoints::descriptor_points(),
2026 arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(),
2027 )];
2028 let expected_chunk = Chunk::from_native_row_ids(
2029 chunks[1].id,
2030 entity_path1.clone(),
2031 None,
2032 &expected_row_ids,
2033 expected_timelines.into_iter().collect(),
2034 expected_components.into_iter().collect(),
2035 )?;
2036
2037 eprintln!("Expected:\n{expected_chunk}");
2038 eprintln!("Got:\n{}", chunks[1]);
2039 assert_eq!(expected_chunk, chunks[1]);
2040 }
2041
2042 Ok(())
2043 }
2044}