re_chunk/
batcher.rs

1use std::{
2    hash::{Hash as _, Hasher as _},
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use arrow::array::{Array as ArrowArray, ArrayRef};
8use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
9use crossbeam::channel::{Receiver, Sender};
10use nohash_hasher::IntMap;
11
12use re_arrow_util::arrays_to_list_array_opt;
13use re_byte_size::SizeBytes as _;
14use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, TimePoint, Timeline, TimelineName};
15use re_types_core::ComponentDescriptor;
16
17use crate::{Chunk, ChunkId, ChunkResult, RowId, TimeColumn, chunk::ChunkComponents};
18
19// ---
20
21/// An error that can occur when flushing.
22#[derive(Debug, thiserror::Error)]
23pub enum BatcherFlushError {
24    #[error("Batcher stopped before flushing completed")]
25    Closed,
26
27    #[error("Batcher flush timed out - not all messages were sent.")]
28    Timeout,
29}
30
31/// Errors that can occur when creating/manipulating a [`ChunkBatcher`].
32#[derive(thiserror::Error, Debug)]
33pub enum ChunkBatcherError {
34    /// Error when parsing configuration from environment.
35    #[error("Failed to parse config: '{name}={value}': {err}")]
36    ParseConfig {
37        name: &'static str,
38        value: String,
39        err: Box<dyn std::error::Error + Send + Sync>,
40    },
41
42    /// Error spawning one of the background threads.
43    #[error("Failed to spawn background thread '{name}': {err}")]
44    SpawnThread {
45        name: &'static str,
46        err: Box<dyn std::error::Error + Send + Sync>,
47    },
48}
49
50pub type ChunkBatcherResult<T> = Result<T, ChunkBatcherError>;
51
52/// Callbacks you can install on the [`ChunkBatcher`].
53#[derive(Clone, Default)]
54pub struct BatcherHooks {
55    /// Called when a new row arrives.
56    ///
57    /// The callback is given the slice of all rows not yet batched,
58    /// including the new one.
59    ///
60    /// Used for testing.
61    #[expect(clippy::type_complexity)]
62    pub on_insert: Option<Arc<dyn Fn(&[PendingRow]) + Send + Sync>>,
63
64    /// Called when the batcher's configuration changes.
65    ///
66    /// Called for initial configuration as well as subsequent changes.
67    /// Used for testing.
68    #[expect(clippy::type_complexity)]
69    pub on_config_change: Option<Arc<dyn Fn(&ChunkBatcherConfig) + Send + Sync>>,
70
71    /// Callback to be run when an Arrow Chunk goes out of scope.
72    ///
73    /// See [`re_log_types::ArrowRecordBatchReleaseCallback`] for more information.
74    //
75    // TODO(#6412): probably don't need this anymore.
76    pub on_release: Option<re_log_types::ArrowRecordBatchReleaseCallback>,
77}
78
79impl BatcherHooks {
80    pub const NONE: Self = Self {
81        on_insert: None,
82        on_config_change: None,
83        on_release: None,
84    };
85}
86
87impl PartialEq for BatcherHooks {
88    fn eq(&self, other: &Self) -> bool {
89        let Self {
90            on_insert,
91            on_config_change,
92            on_release,
93        } = self;
94
95        let on_insert_eq = match (on_insert, &other.on_insert) {
96            (Some(a), Some(b)) => Arc::ptr_eq(a, b),
97            (None, None) => true,
98            _ => false,
99        };
100
101        let on_config_change_eq = match (on_config_change, &other.on_config_change) {
102            (Some(a), Some(b)) => Arc::ptr_eq(a, b),
103            (None, None) => true,
104            _ => false,
105        };
106
107        on_insert_eq && on_config_change_eq && on_release == &other.on_release
108    }
109}
110
111impl std::fmt::Debug for BatcherHooks {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        let Self {
114            on_insert,
115            on_config_change,
116            on_release,
117        } = self;
118        f.debug_struct("BatcherHooks")
119            .field("on_insert", &on_insert.as_ref().map(|_| "…"))
120            .field("on_config_change", &on_config_change.as_ref().map(|_| "…"))
121            .field("on_release", &on_release)
122            .finish()
123    }
124}
125
126// ---
127
128/// Defines the different thresholds of the associated [`ChunkBatcher`].
129///
130/// See [`Self::default`] and [`Self::from_env`].
131#[derive(Clone, Debug, PartialEq, Eq)]
132pub struct ChunkBatcherConfig {
133    /// Duration of the periodic tick.
134    //
135    // NOTE: We use `std::time` directly because this library has to deal with `crossbeam` as well
136    // as std threads, which both expect standard types anyway.
137    //
138    // TODO(cmc): Add support for burst debouncing.
139    pub flush_tick: Duration,
140
141    /// Flush if the accumulated payload has a size in bytes equal or greater than this.
142    ///
143    /// The resulting [`Chunk`] might be larger than `flush_num_bytes`!
144    pub flush_num_bytes: u64,
145
146    /// Flush if the accumulated payload has a number of rows equal or greater than this.
147    pub flush_num_rows: u64,
148
149    /// Split a chunk if it contains >= rows than this threshold and one or more of its timelines are
150    /// unsorted.
151    pub chunk_max_rows_if_unsorted: u64,
152
153    /// Size of the internal channel of commands.
154    ///
155    /// Unbounded if left unspecified.
156    /// Once a batcher is created, this property cannot be changed.
157    pub max_commands_in_flight: Option<u64>,
158
159    /// Size of the internal channel of [`Chunk`]s.
160    ///
161    /// Unbounded if left unspecified.
162    /// Once a batcher is created, this property cannot be changed.
163    pub max_chunks_in_flight: Option<u64>,
164}
165
166impl Default for ChunkBatcherConfig {
167    fn default() -> Self {
168        Self::DEFAULT
169    }
170}
171
172impl ChunkBatcherConfig {
173    /// Default configuration, applicable to most use cases.
174    pub const DEFAULT: Self = Self {
175        flush_tick: Duration::from_millis(200),
176        flush_num_bytes: 1024 * 1024, // 1 MiB
177        flush_num_rows: u64::MAX,
178        chunk_max_rows_if_unsorted: 256,
179        max_commands_in_flight: None,
180        max_chunks_in_flight: None,
181    };
182
183    /// Low-latency configuration, preferred when streaming directly to a viewer.
184    pub const LOW_LATENCY: Self = Self {
185        flush_tick: Duration::from_millis(8), // We want it fast enough for 60 Hz for real time camera feel
186        ..Self::DEFAULT
187    };
188
189    /// Always flushes ASAP.
190    pub const ALWAYS: Self = Self {
191        flush_tick: Duration::MAX,
192        flush_num_bytes: 0,
193        flush_num_rows: 0,
194        chunk_max_rows_if_unsorted: 256,
195        max_commands_in_flight: None,
196        max_chunks_in_flight: None,
197    };
198
199    /// Never flushes unless manually told to (or hitting one the builtin invariants).
200    pub const NEVER: Self = Self {
201        flush_tick: Duration::MAX,
202        flush_num_bytes: u64::MAX,
203        flush_num_rows: u64::MAX,
204        chunk_max_rows_if_unsorted: 256,
205        max_commands_in_flight: None,
206        max_chunks_in_flight: None,
207    };
208
209    /// Environment variable to configure [`Self::flush_tick`].
210    pub const ENV_FLUSH_TICK: &'static str = "RERUN_FLUSH_TICK_SECS";
211
212    /// Environment variable to configure [`Self::flush_num_bytes`].
213    pub const ENV_FLUSH_NUM_BYTES: &'static str = "RERUN_FLUSH_NUM_BYTES";
214
215    /// Environment variable to configure [`Self::flush_num_rows`].
216    pub const ENV_FLUSH_NUM_ROWS: &'static str = "RERUN_FLUSH_NUM_ROWS";
217
218    /// Environment variable to configure [`Self::chunk_max_rows_if_unsorted`].
219    //
220    // NOTE: Shared with the same env-var on the store side, for consistency.
221    pub const ENV_CHUNK_MAX_ROWS_IF_UNSORTED: &'static str = "RERUN_CHUNK_MAX_ROWS_IF_UNSORTED";
222
223    /// Environment variable to configure [`Self::chunk_max_rows_if_unsorted`].
224    #[deprecated(note = "use `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` instead")]
225    const ENV_MAX_CHUNK_ROWS_IF_UNSORTED: &'static str = "RERUN_MAX_CHUNK_ROWS_IF_UNSORTED";
226
227    /// Creates a new `ChunkBatcherConfig` using the default values, optionally overridden
228    /// through the environment.
229    ///
230    /// See [`Self::apply_env`].
231    #[inline]
232    pub fn from_env() -> ChunkBatcherResult<Self> {
233        Self::default().apply_env()
234    }
235
236    /// Returns a copy of `self`, overriding existing fields with values from the environment if
237    /// they are present.
238    ///
239    /// See [`Self::ENV_FLUSH_TICK`], [`Self::ENV_FLUSH_NUM_BYTES`], [`Self::ENV_FLUSH_NUM_BYTES`].
240    pub fn apply_env(&self) -> ChunkBatcherResult<Self> {
241        let mut new = self.clone();
242
243        if let Ok(s) = std::env::var(Self::ENV_FLUSH_TICK) {
244            let flush_duration_secs: f64 =
245                s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
246                    name: Self::ENV_FLUSH_TICK,
247                    value: s.clone(),
248                    err: Box::new(err),
249                })?;
250
251            new.flush_tick = Duration::try_from_secs_f64(flush_duration_secs).map_err(|err| {
252                ChunkBatcherError::ParseConfig {
253                    name: Self::ENV_FLUSH_TICK,
254                    value: s.clone(),
255                    err: Box::new(err),
256                }
257            })?;
258        }
259
260        if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) {
261            if let Some(num_bytes) = re_format::parse_bytes(&s) {
262                // e.g. "10MB"
263                new.flush_num_bytes = num_bytes.unsigned_abs();
264            } else {
265                // Assume it's just an integer
266                new.flush_num_bytes = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
267                    name: Self::ENV_FLUSH_NUM_BYTES,
268                    value: s.clone(),
269                    err: Box::new(err),
270                })?;
271            }
272        }
273
274        if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_ROWS) {
275            new.flush_num_rows = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
276                name: Self::ENV_FLUSH_NUM_ROWS,
277                value: s.clone(),
278                err: Box::new(err),
279            })?;
280        }
281
282        if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED) {
283            new.chunk_max_rows_if_unsorted =
284                s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
285                    name: Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED,
286                    value: s.clone(),
287                    err: Box::new(err),
288                })?;
289        }
290
291        // Deprecated
292        #[expect(deprecated)]
293        if let Ok(s) = std::env::var(Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED) {
294            new.chunk_max_rows_if_unsorted =
295                s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
296                    name: Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED,
297                    value: s.clone(),
298                    err: Box::new(err),
299                })?;
300        }
301
302        Ok(new)
303    }
304}
305
306#[test]
307fn chunk_batcher_config() {
308    #![expect(unsafe_code)] // It's only a test
309
310    // Detect breaking changes in our environment variables.
311    // SAFETY: it's a test
312    unsafe {
313        std::env::set_var("RERUN_FLUSH_TICK_SECS", "0.3");
314        std::env::set_var("RERUN_FLUSH_NUM_BYTES", "42");
315        std::env::set_var("RERUN_FLUSH_NUM_ROWS", "666");
316        std::env::set_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED", "7777");
317    }
318
319    let config = ChunkBatcherConfig::from_env().unwrap();
320    let expected = ChunkBatcherConfig {
321        flush_tick: Duration::from_millis(300),
322        flush_num_bytes: 42,
323        flush_num_rows: 666,
324        chunk_max_rows_if_unsorted: 7777,
325        ..Default::default()
326    };
327    assert_eq!(expected, config);
328
329    // SAFETY: it's a test
330    unsafe {
331        std::env::set_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED", "9999");
332    }
333
334    let config = ChunkBatcherConfig::from_env().unwrap();
335    let expected = ChunkBatcherConfig {
336        flush_tick: Duration::from_millis(300),
337        flush_num_bytes: 42,
338        flush_num_rows: 666,
339        chunk_max_rows_if_unsorted: 9999,
340        ..Default::default()
341    };
342    assert_eq!(expected, config);
343}
344
345// ---
346
347/// Implements an asynchronous batcher that coalesces [`PendingRow`]s into [`Chunk`]s based upon
348/// the thresholds defined in the associated [`ChunkBatcherConfig`].
349///
350/// ## Batching vs. splitting
351///
352/// The batching process is triggered solely by time and space thresholds -- whichever is hit first.
353/// This process will result in one big dataframe.
354///
355/// The splitting process will then run on top of that big dataframe, and split it further down
356/// into smaller [`Chunk`]s.
357/// Specifically, the dataframe will be splits into enough [`Chunk`]s so as to guarantee that:
358/// * no chunk contains data for more than one entity path
359/// * no chunk contains rows with different sets of timelines
360/// * no chunk uses more than one datatype for a given component
361/// * no chunk contains more rows than a pre-configured threshold if one or more timelines are unsorted
362///
363/// ## Multithreading and ordering
364///
365/// [`ChunkBatcher`] can be cheaply clone and used freely across any number of threads.
366///
367/// Internally, all operations are linearized into a pipeline:
368/// - All operations sent by a given thread will take effect in the same exact order as that
369///   thread originally sent them in, from its point of view.
370/// - There isn't any well defined global order across multiple threads.
371///
372/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all
373/// previous data sent by the calling thread has been batched and sent down the channel returned
374/// by [`ChunkBatcher::chunks`]; no more, no less.
375///
376/// ## Shutdown
377///
378/// The batcher can only be shutdown by dropping all instances of it, at which point it will
379/// automatically take care of flushing any pending data that might remain in the pipeline.
380///
381/// Shutting down cannot ever block.
382#[derive(Clone)]
383pub struct ChunkBatcher {
384    inner: Arc<ChunkBatcherInner>,
385}
386
387// NOTE: The receiving end of the command stream as well as the sending end of the chunk stream are
388// owned solely by the batching thread.
389struct ChunkBatcherInner {
390    /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed,
391    /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's
392    /// running.
393    tx_cmds: Sender<Command>,
394    // NOTE: Option so we can make shutdown non-blocking even with bounded channels.
395    rx_chunks: Option<Receiver<Chunk>>,
396    cmds_to_chunks_handle: Option<std::thread::JoinHandle<()>>,
397}
398
399impl Drop for ChunkBatcherInner {
400    fn drop(&mut self) {
401        // Drop the receiving end of the chunk stream first and foremost, so that we don't block
402        // even if the output channel is bounded and currently full.
403        if let Some(rx_chunks) = self.rx_chunks.take()
404            && !rx_chunks.is_empty()
405        {
406            re_log::warn!("Dropping data");
407        }
408
409        // NOTE: The command channel is private, if we're here, nothing is currently capable of
410        // sending data down the pipeline.
411        self.tx_cmds.send(Command::Shutdown).ok();
412        if let Some(handle) = self.cmds_to_chunks_handle.take() {
413            handle.join().ok();
414        }
415    }
416}
417
418enum Command {
419    AppendChunk(Chunk),
420    AppendRow(EntityPath, PendingRow),
421    Flush { on_done: Sender<()> },
422    UpdateConfig(ChunkBatcherConfig),
423    Shutdown,
424}
425
426impl Command {
427    fn flush() -> (Self, Receiver<()>) {
428        let (tx, rx) = crossbeam::channel::bounded(1); // oneshot
429        (Self::Flush { on_done: tx }, rx)
430    }
431}
432
433impl ChunkBatcher {
434    /// Creates a new [`ChunkBatcher`] using the passed in `config`.
435    ///
436    /// The returned object must be kept in scope: dropping it will trigger a clean shutdown of the
437    /// batcher.
438    #[must_use = "Batching threads will automatically shutdown when this object is dropped"]
439    #[expect(clippy::needless_pass_by_value)]
440    pub fn new(config: ChunkBatcherConfig, hooks: BatcherHooks) -> ChunkBatcherResult<Self> {
441        let (tx_cmds, rx_cmd) = match config.max_commands_in_flight {
442            Some(cap) => crossbeam::channel::bounded(cap as _),
443            None => crossbeam::channel::unbounded(),
444        };
445
446        let (tx_chunk, rx_chunks) = match config.max_chunks_in_flight {
447            Some(cap) => crossbeam::channel::bounded(cap as _),
448            None => crossbeam::channel::unbounded(),
449        };
450
451        let cmds_to_chunks_handle = {
452            const NAME: &str = "ChunkBatcher::cmds_to_chunks";
453            std::thread::Builder::new()
454                .name(NAME.into())
455                .spawn({
456                    let config = config.clone();
457                    move || batching_thread(config, hooks, rx_cmd, tx_chunk)
458                })
459                .map_err(|err| ChunkBatcherError::SpawnThread {
460                    name: NAME,
461                    err: Box::new(err),
462                })?
463        };
464
465        re_log::debug!(?config, "creating new chunk batcher");
466
467        let inner = ChunkBatcherInner {
468            tx_cmds,
469            rx_chunks: Some(rx_chunks),
470            cmds_to_chunks_handle: Some(cmds_to_chunks_handle),
471        };
472
473        Ok(Self {
474            inner: Arc::new(inner),
475        })
476    }
477
478    // --- Send commands ---
479
480    pub fn push_chunk(&self, chunk: Chunk) {
481        self.inner.push_chunk(chunk);
482    }
483
484    /// Pushes a [`PendingRow`] down the batching pipeline.
485    ///
486    /// This will computea the size of the row from the batching thread!
487    ///
488    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
489    #[inline]
490    pub fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
491        self.inner.push_row(entity_path, row);
492    }
493
494    /// Initiates a flush of the pipeline and returns immediately.
495    ///
496    /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
497    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
498    #[inline]
499    pub fn flush_async(&self) {
500        self.inner.flush_async();
501    }
502
503    /// Initiates a flush the batching pipeline and waits for it to propagate.
504    ///
505    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
506    #[inline]
507    pub fn flush_blocking(&self, timeout: Duration) -> Result<(), BatcherFlushError> {
508        self.inner.flush_blocking(timeout)
509    }
510
511    /// Updates the batcher's configuration as far as possible.
512    pub fn update_config(&self, config: ChunkBatcherConfig) {
513        self.inner.update_config(config);
514    }
515
516    // --- Subscribe to chunks ---
517
518    /// Returns a _shared_ channel in which are sent the batched [`Chunk`]s.
519    ///
520    /// Shutting down the batcher will close this channel.
521    ///
522    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
523    pub fn chunks(&self) -> Receiver<Chunk> {
524        // NOTE: `rx_chunks` is only ever taken when the batcher as a whole is dropped, at which
525        // point it is impossible to call this method.
526        #[expect(clippy::unwrap_used)]
527        self.inner.rx_chunks.clone().unwrap()
528    }
529}
530
531impl ChunkBatcherInner {
532    fn push_chunk(&self, chunk: Chunk) {
533        self.send_cmd(Command::AppendChunk(chunk));
534    }
535
536    fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
537        self.send_cmd(Command::AppendRow(entity_path, row));
538    }
539
540    fn flush_async(&self) {
541        let (flush_cmd, _) = Command::flush();
542        self.send_cmd(flush_cmd);
543    }
544
545    fn flush_blocking(&self, timeout: Duration) -> Result<(), BatcherFlushError> {
546        use crossbeam::channel::RecvTimeoutError;
547
548        let (flush_cmd, on_done) = Command::flush();
549        self.send_cmd(flush_cmd);
550
551        on_done.recv_timeout(timeout).map_err(|err| match err {
552            RecvTimeoutError::Timeout => BatcherFlushError::Timeout,
553            RecvTimeoutError::Disconnected => BatcherFlushError::Closed,
554        })
555    }
556
557    fn update_config(&self, config: ChunkBatcherConfig) {
558        self.send_cmd(Command::UpdateConfig(config));
559    }
560
561    fn send_cmd(&self, cmd: Command) {
562        // NOTE: Internal channels can never be closed outside of the `Drop` impl, this cannot
563        // fail.
564        self.tx_cmds.send(cmd).ok();
565    }
566}
567
568#[expect(clippy::needless_pass_by_value)]
569fn batching_thread(
570    mut config: ChunkBatcherConfig,
571    hooks: BatcherHooks,
572    rx_cmd: Receiver<Command>,
573    tx_chunk: Sender<Chunk>,
574) {
575    let mut rx_tick = crossbeam::channel::tick(config.flush_tick);
576
577    struct Accumulator {
578        latest: Instant,
579        entity_path: EntityPath,
580        pending_rows: Vec<PendingRow>,
581        pending_num_bytes: u64,
582    }
583
584    impl Accumulator {
585        fn new(entity_path: EntityPath) -> Self {
586            Self {
587                entity_path,
588                latest: Instant::now(),
589                pending_rows: Default::default(),
590                pending_num_bytes: Default::default(),
591            }
592        }
593
594        fn reset(&mut self) {
595            self.latest = Instant::now();
596            self.pending_rows.clear();
597            self.pending_num_bytes = 0;
598        }
599    }
600
601    let mut accs: IntMap<EntityPath, Accumulator> = IntMap::default();
602
603    fn do_push_row(acc: &mut Accumulator, row: PendingRow) {
604        acc.pending_num_bytes += row.total_size_bytes();
605        acc.pending_rows.push(row);
606    }
607
608    fn do_flush_all(
609        acc: &mut Accumulator,
610        tx_chunk: &Sender<Chunk>,
611        reason: &str,
612        chunk_max_rows_if_unsorted: u64,
613    ) {
614        let rows = std::mem::take(&mut acc.pending_rows);
615        if rows.is_empty() {
616            return;
617        }
618
619        re_log::trace!(
620            "Flushing {} rows and {} bytes. Reason: {reason}",
621            rows.len(),
622            re_format::format_bytes(acc.pending_num_bytes as _)
623        );
624
625        let chunks =
626            PendingRow::many_into_chunks(acc.entity_path.clone(), chunk_max_rows_if_unsorted, rows);
627        for chunk in chunks {
628            let chunk = match chunk {
629                Ok(chunk) => chunk,
630                Err(err) => {
631                    re_log::error!(%err, "corrupt chunk detected, dropping");
632                    continue;
633                }
634            };
635
636            // NOTE: This can only fail if all receivers have been dropped, which simply cannot happen
637            // as long the batching thread is alive… which is where we currently are.
638
639            if !chunk.components.is_empty() {
640                // make sure the chunk didn't contain *only* indicators!
641                tx_chunk.send(chunk).ok();
642            } else {
643                re_log::warn_once!(
644                    "Dropping chunk without components. Entity path: {}",
645                    chunk.entity_path()
646                );
647            }
648        }
649
650        acc.reset();
651    }
652
653    re_log::trace!(
654        "Flushing every: {:.2}s, {} rows, {}",
655        config.flush_tick.as_secs_f64(),
656        config.flush_num_rows,
657        re_format::format_bytes(config.flush_num_bytes as _),
658    );
659    // Signal initial config
660    if let Some(on_config_change) = hooks.on_config_change.as_ref() {
661        on_config_change(&config);
662    }
663
664    // Set to `true` when a flush is triggered for a reason other than hitting the time threshold,
665    // so that the next tick will not unnecessarily fire early.
666    let mut skip_next_tick = false;
667
668    use crossbeam::select;
669    loop {
670        select! {
671            recv(rx_cmd) -> cmd => {
672                let Ok(cmd) = cmd else {
673                    // All command senders are gone, which can only happen if the
674                    // `ChunkBatcher` itself has been dropped.
675                    break;
676                };
677
678
679                match cmd {
680                    Command::AppendChunk(chunk) => {
681                        // NOTE: This can only fail if all receivers have been dropped, which simply cannot happen
682                        // as long the batching thread is alive… which is where we currently are.
683
684                        if !chunk.components.is_empty() {
685                            // make sure the chunk didn't contain *only* indicators!
686                            tx_chunk.send(chunk).ok();
687                        } else {
688                            re_log::warn_once!(
689                                "Dropping chunk without components. Entity path: {}",
690                                chunk.entity_path()
691                            );
692                        }
693                    },
694                    Command::AppendRow(entity_path, row) => {
695                        let acc = accs.entry(entity_path.clone())
696                            .or_insert_with(|| Accumulator::new(entity_path));
697                        do_push_row(acc, row);
698
699                        if let Some(config) = hooks.on_insert.as_ref() {
700                            config(&acc.pending_rows);
701                        }
702
703                        if acc.pending_rows.len() as u64 >= config.flush_num_rows {
704                            do_flush_all(acc, &tx_chunk, "rows", config.chunk_max_rows_if_unsorted);
705                            skip_next_tick = true;
706                        } else if acc.pending_num_bytes >= config.flush_num_bytes {
707                            do_flush_all(acc, &tx_chunk, "bytes", config.chunk_max_rows_if_unsorted);
708                            skip_next_tick = true;
709                        }
710                    },
711
712                    Command::Flush{ on_done } => {
713                        skip_next_tick = true;
714                        for acc in accs.values_mut() {
715                            do_flush_all(acc, &tx_chunk, "manual", config.chunk_max_rows_if_unsorted);
716                        }
717                        on_done.send(()).ok();
718                    },
719
720                    Command::UpdateConfig(new_config) => {
721                        // Warn if properties changed that we can't change here.
722                        if config.max_commands_in_flight != new_config.max_commands_in_flight ||
723                            config.max_chunks_in_flight != new_config.max_chunks_in_flight {
724                            re_log::warn!("Cannot change max commands/chunks in flight after batcher has been created. Previous max commands/chunks: {:?}/{:?}, new max commands/chunks: {:?}/{:?}",
725                                            config.max_commands_in_flight, config.max_chunks_in_flight, new_config.max_commands_in_flight, new_config.max_chunks_in_flight);
726                        }
727
728                        re_log::trace!("Updated batcher config: {:?}", new_config);
729                        if let Some(on_config_change) = hooks.on_config_change.as_ref() {
730                            on_config_change(&new_config);
731                        }
732
733                        config = new_config;
734                        rx_tick = crossbeam::channel::tick(config.flush_tick);
735                    }
736
737                    Command::Shutdown => break,
738                }
739            },
740
741            recv(rx_tick) -> _ => {
742                if skip_next_tick {
743                    skip_next_tick = false;
744                } else {
745                    // TODO(cmc): It would probably be better to have a ticker per entity path. Maybe. At some point.
746                    for acc in accs.values_mut() {
747                        do_flush_all(acc, &tx_chunk, "tick", config.chunk_max_rows_if_unsorted);
748                    }
749                }
750            },
751        };
752    }
753
754    drop(rx_cmd);
755    for acc in accs.values_mut() {
756        do_flush_all(
757            acc,
758            &tx_chunk,
759            "shutdown",
760            config.chunk_max_rows_if_unsorted,
761        );
762    }
763    drop(tx_chunk);
764
765    // NOTE: The receiving end of the command stream as well as the sending end of the chunk
766    // stream are owned solely by this thread.
767    // Past this point, all command writes and all chunk reads will return `ErrDisconnected`.
768}
769
770// ---
771
772/// A single row's worth of data (i.e. a single log call).
773///
774/// Send those to the batcher to build up a [`Chunk`].
775#[derive(Debug, Clone)]
776pub struct PendingRow {
777    /// Auto-generated `TUID`, uniquely identifying this event and keeping track of the client's
778    /// wall-clock.
779    pub row_id: RowId,
780
781    /// User-specified [`TimePoint`] for this event.
782    pub timepoint: TimePoint,
783
784    /// The component data.
785    ///
786    /// Each array is a single component, i.e. _not_ a list array.
787    pub components: IntMap<ComponentDescriptor, ArrayRef>,
788}
789
790impl PendingRow {
791    #[inline]
792    pub fn new(timepoint: TimePoint, components: IntMap<ComponentDescriptor, ArrayRef>) -> Self {
793        Self {
794            row_id: RowId::new(),
795            timepoint,
796            components,
797        }
798    }
799}
800
801impl re_byte_size::SizeBytes for PendingRow {
802    #[inline]
803    fn heap_size_bytes(&self) -> u64 {
804        let Self {
805            row_id,
806            timepoint,
807            components,
808        } = self;
809
810        row_id.heap_size_bytes() + timepoint.heap_size_bytes() + components.heap_size_bytes()
811    }
812}
813
814impl PendingRow {
815    /// Turn a single row into a [`Chunk`] of its own.
816    ///
817    /// That's very wasteful, probably don't do that outside of testing, or unless you have very
818    /// good reasons too.
819    ///
820    /// See also [`Self::many_into_chunks`].
821    pub fn into_chunk(self, entity_path: EntityPath) -> ChunkResult<Chunk> {
822        let Self {
823            row_id,
824            timepoint,
825            components,
826        } = self;
827
828        let timelines = timepoint
829            .into_iter()
830            .map(|(timeline_name, cell)| {
831                let times = ArrowScalarBuffer::from(vec![cell.as_i64()]);
832                let time_column =
833                    TimeColumn::new(Some(true), Timeline::new(timeline_name, cell.typ()), times);
834                (timeline_name, time_column)
835            })
836            .collect();
837
838        let mut per_desc = ChunkComponents::default();
839        for (component_desc, array) in components {
840            let list_array = arrays_to_list_array_opt(&[Some(&*array as _)]);
841            if let Some(list_array) = list_array {
842                per_desc.insert(component_desc, list_array);
843            }
844        }
845
846        Chunk::from_native_row_ids(
847            ChunkId::new(),
848            entity_path,
849            Some(true),
850            &[row_id],
851            timelines,
852            per_desc,
853        )
854    }
855
856    /// This turns a batch of [`PendingRow`]s into a [`Chunk`].
857    ///
858    /// There are a lot of conditions to fulfill for a [`Chunk`] to be valid: this helper makes
859    /// sure to fulfill all of them by splitting the chunk into one or more pieces as necessary.
860    ///
861    /// In particular, a [`Chunk`] cannot:
862    /// * contain data for more than one entity path
863    /// * contain rows with different sets of timelines
864    /// * use more than one datatype for a given component
865    /// * contain more rows than a pre-configured threshold if one or more timelines are unsorted
866    //
867    // TODO(cmc): there are lots of performance improvement opportunities in this one, but let's
868    // see if that actually matters in practice first.
869    pub fn many_into_chunks(
870        entity_path: EntityPath,
871        chunk_max_rows_if_unsorted: u64,
872        mut rows: Vec<Self>,
873    ) -> impl Iterator<Item = ChunkResult<Chunk>> {
874        re_tracing::profile_function!();
875
876        // First things first, sort all the rows by row ID -- that's our global order and it holds
877        // true no matter what.
878        {
879            re_tracing::profile_scope!("sort rows");
880            rows.sort_by_key(|row| row.row_id);
881        }
882
883        // Then organize the rows in micro batches -- one batch per unique set of timelines.
884        let mut per_timeline_set: IntMap<u64 /* Timeline set */, Vec<Self>> = Default::default();
885        {
886            re_tracing::profile_scope!("compute timeline sets");
887
888            // The hash is deterministic because the traversal of a `TimePoint` is itself
889            // deterministic: `TimePoint` is backed by a `BTreeMap`.
890            for row in rows {
891                let mut hasher = ahash::AHasher::default();
892                row.timepoint.timeline_names().for_each(|timeline| {
893                    <TimelineName as std::hash::Hash>::hash(timeline, &mut hasher);
894                });
895
896                per_timeline_set
897                    .entry(hasher.finish())
898                    .or_default()
899                    .push(row);
900            }
901        }
902
903        per_timeline_set.into_values().flat_map(move |rows| {
904            re_tracing::profile_scope!("iterate per timeline set");
905
906            // Then we split the micro batches even further -- one sub-batch per unique set of datatypes.
907            let mut per_datatype_set: IntMap<u64 /* ArrowDatatype set */, Vec<Self>> =
908                Default::default();
909            {
910                re_tracing::profile_scope!("compute datatype sets");
911
912                // The hash is dependent on the order in which the `PendingRow` was created (i.e.
913                // the order in which its components were inserted).
914                //
915                // This is because the components are stored in a `IntMap`, which doesn't do any
916                // hashing. For that reason, the traversal order of a `IntMap` in deterministic:
917                // it's always the same for `IntMap` that share the same keys, as long as these
918                // keys were inserted in the same order.
919                // See `intmap_order_is_deterministic` in the tests below.
920                //
921                // In practice, the `PendingRow`s in a given program are always built in the same
922                // order for the duration of that program, which is why this works.
923                // See `simple_but_hashes_wont_match` in the tests below.
924                for row in rows {
925                    let mut hasher = ahash::AHasher::default();
926                    row.components
927                        .values()
928                        .for_each(|array| array.data_type().hash(&mut hasher));
929                    per_datatype_set
930                        .entry(hasher.finish())
931                        .or_default()
932                        .push(row);
933                }
934            }
935
936            // And finally we can build the resulting chunks.
937            let entity_path = entity_path.clone();
938            per_datatype_set.into_values().flat_map(move |rows| {
939                re_tracing::profile_scope!("iterate per datatype set");
940
941                let mut row_ids: Vec<RowId> = Vec::with_capacity(rows.len());
942                let mut timelines: IntMap<TimelineName, PendingTimeColumn> = IntMap::default();
943
944                // Create all the logical list arrays that we're going to need, accounting for the
945                // possibility of sparse components in the data.
946                let mut all_components: IntMap<ComponentDescriptor, Vec<Option<&dyn ArrowArray>>> =
947                    IntMap::default();
948                for row in &rows {
949                    for component_desc in row.components.keys() {
950                        all_components.entry(component_desc.clone()).or_default();
951                    }
952                }
953
954                let mut chunks = Vec::new();
955
956                let mut components = all_components.clone();
957                for row in &rows {
958                    let Self {
959                        row_id,
960                        timepoint: row_timepoint,
961                        components: row_components,
962                    } = row;
963
964                    // Look for unsorted timelines -- if we find any, and the chunk is larger than
965                    // the pre-configured `chunk_max_rows_if_unsorted` threshold, then split _even_
966                    // further!
967                    for (&timeline_name, cell) in row_timepoint {
968                        let time_column = timelines.entry(timeline_name).or_insert_with(|| {
969                            PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
970                        });
971
972                        if !row_ids.is_empty() // just being extra cautious
973                            && row_ids.len() as u64 >= chunk_max_rows_if_unsorted
974                            && !time_column.is_sorted
975                        {
976                            chunks.push(Chunk::from_native_row_ids(
977                                ChunkId::new(),
978                                entity_path.clone(),
979                                Some(true),
980                                &std::mem::take(&mut row_ids),
981                                std::mem::take(&mut timelines)
982                                    .into_iter()
983                                    .map(|(name, time_column)| (name, time_column.finish()))
984                                    .collect(),
985                                {
986                                    let mut per_desc = ChunkComponents::default();
987                                    for (component_desc, arrays) in std::mem::take(&mut components)
988                                    {
989                                        let list_array = arrays_to_list_array_opt(&arrays);
990                                        if let Some(list_array) = list_array {
991                                            per_desc.insert(component_desc, list_array);
992                                        }
993                                    }
994                                    per_desc
995                                },
996                            ));
997
998                            components = all_components.clone();
999                        }
1000                    }
1001
1002                    row_ids.push(*row_id);
1003
1004                    for (&timeline_name, &cell) in row_timepoint {
1005                        let time_column = timelines.entry(timeline_name).or_insert_with(|| {
1006                            PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
1007                        });
1008                        time_column.push(cell.into());
1009                    }
1010
1011                    for (component_desc, arrays) in &mut components {
1012                        // NOTE: This will push `None` if the row doesn't actually hold a value for this
1013                        // component -- these are sparse list arrays!
1014                        arrays.push(
1015                            row_components
1016                                .get(component_desc)
1017                                .map(|array| &**array as &dyn ArrowArray),
1018                        );
1019                    }
1020                }
1021
1022                chunks.push(Chunk::from_native_row_ids(
1023                    ChunkId::new(),
1024                    entity_path.clone(),
1025                    Some(true),
1026                    &std::mem::take(&mut row_ids),
1027                    timelines
1028                        .into_iter()
1029                        .map(|(timeline, time_column)| (timeline, time_column.finish()))
1030                        .collect(),
1031                    {
1032                        let mut per_desc = ChunkComponents::default();
1033                        for (component_desc, arrays) in components {
1034                            let list_array = arrays_to_list_array_opt(&arrays);
1035                            if let Some(list_array) = list_array {
1036                                per_desc.insert(component_desc, list_array);
1037                            }
1038                        }
1039                        per_desc
1040                    },
1041                ));
1042
1043                chunks
1044            })
1045        })
1046    }
1047}
1048
1049/// Helper class used to buffer time data.
1050///
1051/// See [`PendingRow::many_into_chunks`] for usage.
1052struct PendingTimeColumn {
1053    timeline: Timeline,
1054    times: Vec<i64>,
1055    is_sorted: bool,
1056    time_range: AbsoluteTimeRange,
1057}
1058
1059impl PendingTimeColumn {
1060    fn new(timeline: Timeline) -> Self {
1061        Self {
1062            timeline,
1063            times: Default::default(),
1064            is_sorted: true,
1065            time_range: AbsoluteTimeRange::EMPTY,
1066        }
1067    }
1068
1069    /// Push a single time value at the end of this chunk.
1070    fn push(&mut self, time: TimeInt) {
1071        let Self {
1072            timeline: _,
1073            times,
1074            is_sorted,
1075            time_range,
1076        } = self;
1077
1078        *is_sorted &= times.last().copied().unwrap_or(TimeInt::MIN.as_i64()) <= time.as_i64();
1079        time_range.set_min(TimeInt::min(time_range.min(), time));
1080        time_range.set_max(TimeInt::max(time_range.max(), time));
1081        times.push(time.as_i64());
1082    }
1083
1084    fn finish(self) -> TimeColumn {
1085        let Self {
1086            timeline,
1087            times,
1088            is_sorted,
1089            time_range,
1090        } = self;
1091
1092        TimeColumn {
1093            timeline,
1094            times: ArrowScalarBuffer::from(times),
1095            is_sorted,
1096            time_range,
1097        }
1098    }
1099}
1100
1101// ---
1102
1103// NOTE:
1104// These tests only cover the chunk splitting conditions described in `many_into_chunks`.
1105// Temporal and spatial thresholds are already taken care of by the RecordingStream test suite.
1106
1107#[cfg(test)]
1108mod tests {
1109    use crossbeam::channel::TryRecvError;
1110
1111    use re_log_types::example_components::{MyIndex, MyLabel, MyPoint, MyPoint64, MyPoints};
1112    use re_types_core::Loggable as _;
1113
1114    use super::*;
1115
1116    /// A bunch of rows that don't fit any of the split conditions should end up together.
1117    #[test]
1118    fn simple() -> anyhow::Result<()> {
1119        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1120
1121        let timeline1 = Timeline::new_duration("log_time");
1122
1123        let timepoint1 = TimePoint::default().with(timeline1, 42);
1124        let timepoint2 = TimePoint::default().with(timeline1, 43);
1125        let timepoint3 = TimePoint::default().with(timeline1, 44);
1126
1127        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1128        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1129        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1130
1131        let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1132        let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1133        let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1134
1135        let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1136        let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1137        let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1138
1139        let components1 = [
1140            (MyPoints::descriptor_points(), points1.clone()),
1141            (MyPoints::descriptor_labels(), labels1.clone()),
1142            (MyIndex::partial_descriptor(), indices1.clone()),
1143        ];
1144        let components2 = [
1145            (MyPoints::descriptor_points(), points2.clone()),
1146            (MyPoints::descriptor_labels(), labels2.clone()),
1147            (MyIndex::partial_descriptor(), indices2.clone()),
1148        ];
1149        let components3 = [
1150            (MyPoints::descriptor_points(), points3.clone()),
1151            (MyPoints::descriptor_labels(), labels3.clone()),
1152            (MyIndex::partial_descriptor(), indices3.clone()),
1153        ];
1154
1155        let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1156        let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1157        let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1158
1159        let entity_path1: EntityPath = "a/b/c".into();
1160        batcher.push_row(entity_path1.clone(), row1.clone());
1161        batcher.push_row(entity_path1.clone(), row2.clone());
1162        batcher.push_row(entity_path1.clone(), row3.clone());
1163
1164        let chunks_rx = batcher.chunks();
1165        drop(batcher); // flush and close
1166
1167        let mut chunks = Vec::new();
1168        loop {
1169            let chunk = match chunks_rx.try_recv() {
1170                Ok(chunk) => chunk,
1171                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1172                Err(TryRecvError::Disconnected) => break,
1173            };
1174            chunks.push(chunk);
1175        }
1176
1177        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1178
1179        // Make the programmer's life easier if this test fails.
1180        eprintln!("Chunks:");
1181        for chunk in &chunks {
1182            eprintln!("{chunk}");
1183        }
1184
1185        assert_eq!(1, chunks.len());
1186
1187        {
1188            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1189            let expected_timelines = [(
1190                *timeline1.name(),
1191                TimeColumn::new(Some(true), timeline1, vec![42, 43, 44].into()),
1192            )];
1193            let expected_components = [
1194                (
1195                    MyPoints::descriptor_points(),
1196                    arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1197                ), //
1198                (
1199                    MyPoints::descriptor_labels(),
1200                    arrays_to_list_array_opt(&[&*labels1, &*labels2, &*labels3].map(Some)).unwrap(),
1201                ), //
1202                (
1203                    MyIndex::partial_descriptor(),
1204                    arrays_to_list_array_opt(&[&*indices1, &*indices2, &*indices3].map(Some))
1205                        .unwrap(),
1206                ), //
1207            ];
1208            let expected_chunk = Chunk::from_native_row_ids(
1209                chunks[0].id,
1210                entity_path1.clone(),
1211                None,
1212                &expected_row_ids,
1213                expected_timelines.into_iter().collect(),
1214                expected_components.into_iter().collect(),
1215            )?;
1216
1217            eprintln!("Expected:\n{expected_chunk}");
1218            eprintln!("Got:\n{}", chunks[0]);
1219            assert_eq!(expected_chunk, chunks[0]);
1220        }
1221
1222        Ok(())
1223    }
1224
1225    #[test]
1226    #[expect(clippy::len_zero)]
1227    fn simple_but_hashes_might_not_match() -> anyhow::Result<()> {
1228        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1229
1230        let timeline1 = Timeline::new_duration("log_time");
1231
1232        let timepoint1 = TimePoint::default().with(timeline1, 42);
1233        let timepoint2 = TimePoint::default().with(timeline1, 43);
1234        let timepoint3 = TimePoint::default().with(timeline1, 44);
1235
1236        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1237        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1238        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1239
1240        let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1241        let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1242        let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1243
1244        let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1245        let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1246        let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1247
1248        let components1 = [
1249            (MyIndex::partial_descriptor(), indices1.clone()),
1250            (MyPoints::descriptor_points(), points1.clone()),
1251            (MyPoints::descriptor_labels(), labels1.clone()),
1252        ];
1253        let components2 = [
1254            (MyPoints::descriptor_points(), points2.clone()),
1255            (MyPoints::descriptor_labels(), labels2.clone()),
1256            (MyIndex::partial_descriptor(), indices2.clone()),
1257        ];
1258        let components3 = [
1259            (MyPoints::descriptor_labels(), labels3.clone()),
1260            (MyIndex::partial_descriptor(), indices3.clone()),
1261            (MyPoints::descriptor_points(), points3.clone()),
1262        ];
1263
1264        let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1265        let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1266        let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1267
1268        let entity_path1: EntityPath = "a/b/c".into();
1269        batcher.push_row(entity_path1.clone(), row1.clone());
1270        batcher.push_row(entity_path1.clone(), row2.clone());
1271        batcher.push_row(entity_path1.clone(), row3.clone());
1272
1273        let chunks_rx = batcher.chunks();
1274        drop(batcher); // flush and close
1275
1276        let mut chunks = Vec::new();
1277        loop {
1278            let chunk = match chunks_rx.try_recv() {
1279                Ok(chunk) => chunk,
1280                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1281                Err(TryRecvError::Disconnected) => break,
1282            };
1283            chunks.push(chunk);
1284        }
1285
1286        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1287
1288        // Make the programmer's life easier if this test fails.
1289        eprintln!("Chunks:");
1290        for chunk in &chunks {
1291            eprintln!("{chunk}");
1292        }
1293
1294        // The rows's components were inserted in different orders, and therefore the resulting
1295        // `IntMap`s *may* have different traversal orders, which ultimately means that the datatype
1296        // hashes *may* end up being different: i.e., possibly no batching.
1297        //
1298        // In practice, it's still possible to get lucky and end up with two maps that just happen
1299        // to share the same iteration order regardless, which is why this assertion is overly broad.
1300        // Try running this test with `--show-output`.
1301        assert!(chunks.len() >= 1);
1302
1303        Ok(())
1304    }
1305
1306    #[test]
1307    #[expect(clippy::zero_sized_map_values)]
1308    fn intmap_order_is_deterministic() {
1309        let descriptors = [
1310            MyPoints::descriptor_points(),
1311            MyPoints::descriptor_colors(),
1312            MyPoints::descriptor_labels(),
1313            MyPoint64::partial_descriptor(),
1314            MyIndex::partial_descriptor(),
1315        ];
1316
1317        let expected: IntMap<ComponentDescriptor, ()> =
1318            descriptors.iter().cloned().map(|d| (d, ())).collect();
1319        let expected: Vec<_> = expected.into_keys().collect();
1320
1321        for _ in 0..1_000 {
1322            let got_collect: IntMap<ComponentDescriptor, ()> =
1323                descriptors.clone().into_iter().map(|d| (d, ())).collect();
1324            let got_collect: Vec<_> = got_collect.into_keys().collect();
1325
1326            let mut got_insert: IntMap<ComponentDescriptor, ()> = Default::default();
1327            for d in descriptors.clone() {
1328                got_insert.insert(d, ());
1329            }
1330            let got_insert: Vec<_> = got_insert.into_keys().collect();
1331
1332            assert_eq!(expected, got_collect);
1333            assert_eq!(expected, got_insert);
1334        }
1335    }
1336
1337    /// A bunch of rows that don't fit any of the split conditions should end up together.
1338    #[test]
1339    fn simple_static() -> anyhow::Result<()> {
1340        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1341
1342        let static_ = TimePoint::default();
1343
1344        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1345        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1346        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1347
1348        let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1349        let components2 = [(MyPoints::descriptor_points(), points2.clone())];
1350        let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1351
1352        let row1 = PendingRow::new(static_.clone(), components1.into_iter().collect());
1353        let row2 = PendingRow::new(static_.clone(), components2.into_iter().collect());
1354        let row3 = PendingRow::new(static_.clone(), components3.into_iter().collect());
1355
1356        let entity_path1: EntityPath = "a/b/c".into();
1357        batcher.push_row(entity_path1.clone(), row1.clone());
1358        batcher.push_row(entity_path1.clone(), row2.clone());
1359        batcher.push_row(entity_path1.clone(), row3.clone());
1360
1361        let chunks_rx = batcher.chunks();
1362        drop(batcher); // flush and close
1363
1364        let mut chunks = Vec::new();
1365        loop {
1366            let chunk = match chunks_rx.try_recv() {
1367                Ok(chunk) => chunk,
1368                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1369                Err(TryRecvError::Disconnected) => break,
1370            };
1371            chunks.push(chunk);
1372        }
1373
1374        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1375
1376        // Make the programmer's life easier if this test fails.
1377        eprintln!("Chunks:");
1378        for chunk in &chunks {
1379            eprintln!("{chunk}");
1380        }
1381
1382        assert_eq!(1, chunks.len());
1383
1384        {
1385            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1386            let expected_timelines = [];
1387            let expected_components = [(
1388                MyPoints::descriptor_points(),
1389                arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1390            )];
1391            let expected_chunk = Chunk::from_native_row_ids(
1392                chunks[0].id,
1393                entity_path1.clone(),
1394                None,
1395                &expected_row_ids,
1396                expected_timelines.into_iter().collect(),
1397                expected_components.into_iter().collect(),
1398            )?;
1399
1400            eprintln!("Expected:\n{expected_chunk}");
1401            eprintln!("Got:\n{}", chunks[0]);
1402            assert_eq!(expected_chunk, chunks[0]);
1403        }
1404
1405        Ok(())
1406    }
1407
1408    /// A bunch of rows belonging to different entities will end up in different batches.
1409    #[test]
1410    fn different_entities() -> anyhow::Result<()> {
1411        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1412
1413        let timeline1 = Timeline::new_duration("log_time");
1414
1415        let timepoint1 = TimePoint::default().with(timeline1, 42);
1416        let timepoint2 = TimePoint::default().with(timeline1, 43);
1417        let timepoint3 = TimePoint::default().with(timeline1, 44);
1418
1419        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1420        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1421        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1422
1423        let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1424        let components2 = [(MyPoints::descriptor_points(), points2.clone())];
1425        let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1426
1427        let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1428        let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1429        let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1430
1431        let entity_path1: EntityPath = "ent1".into();
1432        let entity_path2: EntityPath = "ent2".into();
1433        batcher.push_row(entity_path1.clone(), row1.clone());
1434        batcher.push_row(entity_path2.clone(), row2.clone());
1435        batcher.push_row(entity_path1.clone(), row3.clone());
1436
1437        let chunks_rx = batcher.chunks();
1438        drop(batcher); // flush and close
1439
1440        let mut chunks = Vec::new();
1441        loop {
1442            let chunk = match chunks_rx.try_recv() {
1443                Ok(chunk) => chunk,
1444                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1445                Err(TryRecvError::Disconnected) => break,
1446            };
1447            chunks.push(chunk);
1448        }
1449
1450        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1451
1452        // Make the programmer's life easier if this test fails.
1453        eprintln!("Chunks:");
1454        for chunk in &chunks {
1455            eprintln!("{chunk}");
1456        }
1457
1458        assert_eq!(2, chunks.len());
1459
1460        {
1461            let expected_row_ids = vec![row1.row_id, row3.row_id];
1462            let expected_timelines = [(
1463                *timeline1.name(),
1464                TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1465            )];
1466            let expected_components = [(
1467                MyPoints::descriptor_points(),
1468                arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1469            )];
1470            let expected_chunk = Chunk::from_native_row_ids(
1471                chunks[0].id,
1472                entity_path1.clone(),
1473                None,
1474                &expected_row_ids,
1475                expected_timelines.into_iter().collect(),
1476                expected_components.into_iter().collect(),
1477            )?;
1478
1479            eprintln!("Expected:\n{expected_chunk}");
1480            eprintln!("Got:\n{}", chunks[0]);
1481            assert_eq!(expected_chunk, chunks[0]);
1482        }
1483
1484        {
1485            let expected_row_ids = vec![row2.row_id];
1486            let expected_timelines = [(
1487                *timeline1.name(),
1488                TimeColumn::new(Some(true), timeline1, vec![43].into()),
1489            )];
1490            let expected_components = [(
1491                MyPoints::descriptor_points(),
1492                arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1493            )];
1494            let expected_chunk = Chunk::from_native_row_ids(
1495                chunks[1].id,
1496                entity_path2.clone(),
1497                None,
1498                &expected_row_ids,
1499                expected_timelines.into_iter().collect(),
1500                expected_components.into_iter().collect(),
1501            )?;
1502
1503            eprintln!("Expected:\n{expected_chunk}");
1504            eprintln!("Got:\n{}", chunks[1]);
1505            assert_eq!(expected_chunk, chunks[1]);
1506        }
1507
1508        Ok(())
1509    }
1510
1511    /// A bunch of rows with different sets of timelines will end up in different batches.
1512    #[test]
1513    fn different_timelines() -> anyhow::Result<()> {
1514        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1515
1516        let timeline1 = Timeline::new_duration("log_time");
1517        let timeline2 = Timeline::new_sequence("frame_nr");
1518
1519        let timepoint1 = TimePoint::default().with(timeline1, 42);
1520        let timepoint2 = TimePoint::default()
1521            .with(timeline1, 43)
1522            .with(timeline2, 1000);
1523        let timepoint3 = TimePoint::default()
1524            .with(timeline1, 44)
1525            .with(timeline2, 1001);
1526
1527        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1528        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1529        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1530
1531        let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1532        let components2 = [(MyPoints::descriptor_points(), points2.clone())];
1533        let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1534
1535        let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1536        let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1537        let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1538
1539        let entity_path1: EntityPath = "a/b/c".into();
1540        batcher.push_row(entity_path1.clone(), row1.clone());
1541        batcher.push_row(entity_path1.clone(), row2.clone());
1542        batcher.push_row(entity_path1.clone(), row3.clone());
1543
1544        let chunks_rx = batcher.chunks();
1545        drop(batcher); // flush and close
1546
1547        let mut chunks = Vec::new();
1548        loop {
1549            let chunk = match chunks_rx.try_recv() {
1550                Ok(chunk) => chunk,
1551                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1552                Err(TryRecvError::Disconnected) => break,
1553            };
1554            chunks.push(chunk);
1555        }
1556
1557        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1558
1559        // Make the programmer's life easier if this test fails.
1560        eprintln!("Chunks:");
1561        for chunk in &chunks {
1562            eprintln!("{chunk}");
1563        }
1564
1565        assert_eq!(2, chunks.len());
1566
1567        {
1568            let expected_row_ids = vec![row1.row_id];
1569            let expected_timelines = [(
1570                *timeline1.name(),
1571                TimeColumn::new(Some(true), timeline1, vec![42].into()),
1572            )];
1573            let expected_components = [(
1574                MyPoints::descriptor_points(),
1575                arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(),
1576            )];
1577            let expected_chunk = Chunk::from_native_row_ids(
1578                chunks[0].id,
1579                entity_path1.clone(),
1580                None,
1581                &expected_row_ids,
1582                expected_timelines.into_iter().collect(),
1583                expected_components.into_iter().collect(),
1584            )?;
1585
1586            eprintln!("Expected:\n{expected_chunk}");
1587            eprintln!("Got:\n{}", chunks[0]);
1588            assert_eq!(expected_chunk, chunks[0]);
1589        }
1590
1591        {
1592            let expected_row_ids = vec![row2.row_id, row3.row_id];
1593            let expected_timelines = [
1594                (
1595                    *timeline1.name(),
1596                    TimeColumn::new(Some(true), timeline1, vec![43, 44].into()),
1597                ),
1598                (
1599                    *timeline2.name(),
1600                    TimeColumn::new(Some(true), timeline2, vec![1000, 1001].into()),
1601                ),
1602            ];
1603            let expected_components = [(
1604                MyPoints::descriptor_points(),
1605                arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(),
1606            )];
1607            let expected_chunk = Chunk::from_native_row_ids(
1608                chunks[1].id,
1609                entity_path1.clone(),
1610                None,
1611                &expected_row_ids,
1612                expected_timelines.into_iter().collect(),
1613                expected_components.into_iter().collect(),
1614            )?;
1615
1616            eprintln!("Expected:\n{expected_chunk}");
1617            eprintln!("Got:\n{}", chunks[1]);
1618            assert_eq!(expected_chunk, chunks[1]);
1619        }
1620
1621        Ok(())
1622    }
1623
1624    /// A bunch of rows with different datatypes will end up in different batches.
1625    #[test]
1626    fn different_datatypes() -> anyhow::Result<()> {
1627        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1628
1629        let timeline1 = Timeline::new_duration("log_time");
1630
1631        let timepoint1 = TimePoint::default().with(timeline1, 42);
1632        let timepoint2 = TimePoint::default().with(timeline1, 43);
1633        let timepoint3 = TimePoint::default().with(timeline1, 44);
1634
1635        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1636        let points2 =
1637            MyPoint64::to_arrow([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?;
1638        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1639
1640        let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1641        let components2 = [(MyPoints::descriptor_points(), points2.clone())]; // same name, different datatype
1642        let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1643
1644        let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1645        let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1646        let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1647
1648        let entity_path1: EntityPath = "a/b/c".into();
1649        batcher.push_row(entity_path1.clone(), row1.clone());
1650        batcher.push_row(entity_path1.clone(), row2.clone());
1651        batcher.push_row(entity_path1.clone(), row3.clone());
1652
1653        let chunks_rx = batcher.chunks();
1654        drop(batcher); // flush and close
1655
1656        let mut chunks = Vec::new();
1657        loop {
1658            let chunk = match chunks_rx.try_recv() {
1659                Ok(chunk) => chunk,
1660                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1661                Err(TryRecvError::Disconnected) => break,
1662            };
1663            chunks.push(chunk);
1664        }
1665
1666        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1667
1668        // Make the programmer's life easier if this test fails.
1669        eprintln!("Chunks:");
1670        for chunk in &chunks {
1671            eprintln!("{chunk}");
1672        }
1673
1674        assert_eq!(2, chunks.len());
1675
1676        {
1677            let expected_row_ids = vec![row1.row_id, row3.row_id];
1678            let expected_timelines = [(
1679                *timeline1.name(),
1680                TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1681            )];
1682            let expected_components = [(
1683                MyPoints::descriptor_points(),
1684                arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1685            )];
1686            let expected_chunk = Chunk::from_native_row_ids(
1687                chunks[0].id,
1688                entity_path1.clone(),
1689                None,
1690                &expected_row_ids,
1691                expected_timelines.into_iter().collect(),
1692                expected_components.into_iter().collect(),
1693            )?;
1694
1695            eprintln!("Expected:\n{expected_chunk}");
1696            eprintln!("Got:\n{}", chunks[0]);
1697            assert_eq!(expected_chunk, chunks[0]);
1698        }
1699
1700        {
1701            let expected_row_ids = vec![row2.row_id];
1702            let expected_timelines = [(
1703                *timeline1.name(),
1704                TimeColumn::new(Some(true), timeline1, vec![43].into()),
1705            )];
1706            let expected_components = [(
1707                MyPoints::descriptor_points(),
1708                arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1709            )];
1710            let expected_chunk = Chunk::from_native_row_ids(
1711                chunks[1].id,
1712                entity_path1.clone(),
1713                None,
1714                &expected_row_ids,
1715                expected_timelines.into_iter().collect(),
1716                expected_components.into_iter().collect(),
1717            )?;
1718
1719            eprintln!("Expected:\n{expected_chunk}");
1720            eprintln!("Got:\n{}", chunks[1]);
1721            assert_eq!(expected_chunk, chunks[1]);
1722        }
1723
1724        Ok(())
1725    }
1726
1727    /// If one or more of the timelines end up unsorted, but the batch is below the unsorted length
1728    /// threshold, we don't do anything special.
1729    #[test]
1730    fn unsorted_timeline_below_threshold() -> anyhow::Result<()> {
1731        let batcher = ChunkBatcher::new(
1732            ChunkBatcherConfig {
1733                chunk_max_rows_if_unsorted: 1000,
1734                ..ChunkBatcherConfig::NEVER
1735            },
1736            BatcherHooks::NONE,
1737        )?;
1738
1739        let timeline1 = Timeline::new_duration("log_time");
1740        let timeline2 = Timeline::new_duration("frame_nr");
1741
1742        let timepoint1 = TimePoint::default()
1743            .with(timeline2, 1000)
1744            .with(timeline1, 42);
1745        let timepoint2 = TimePoint::default()
1746            .with(timeline2, 1001)
1747            .with(timeline1, 43);
1748        let timepoint3 = TimePoint::default()
1749            .with(timeline2, 1002)
1750            .with(timeline1, 44);
1751        let timepoint4 = TimePoint::default()
1752            .with(timeline2, 1003)
1753            .with(timeline1, 45);
1754
1755        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1756        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1757        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1758        let points4 =
1759            MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1760
1761        let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1762        let components2 = [(MyPoints::descriptor_points(), points2.clone())];
1763        let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1764        let components4 = [(MyPoints::descriptor_points(), points4.clone())];
1765
1766        let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect());
1767        let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect());
1768        let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect());
1769        let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect());
1770
1771        let entity_path1: EntityPath = "a/b/c".into();
1772        batcher.push_row(entity_path1.clone(), row1.clone());
1773        batcher.push_row(entity_path1.clone(), row2.clone());
1774        batcher.push_row(entity_path1.clone(), row3.clone());
1775        batcher.push_row(entity_path1.clone(), row4.clone());
1776
1777        let chunks_rx = batcher.chunks();
1778        drop(batcher); // flush and close
1779
1780        let mut chunks = Vec::new();
1781        loop {
1782            let chunk = match chunks_rx.try_recv() {
1783                Ok(chunk) => chunk,
1784                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1785                Err(TryRecvError::Disconnected) => break,
1786            };
1787            chunks.push(chunk);
1788        }
1789
1790        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1791
1792        // Make the programmer's life easier if this test fails.
1793        eprintln!("Chunks:");
1794        for chunk in &chunks {
1795            eprintln!("{chunk}");
1796        }
1797
1798        assert_eq!(1, chunks.len());
1799
1800        {
1801            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id, row4.row_id];
1802            let expected_timelines = [
1803                (
1804                    *timeline1.name(),
1805                    TimeColumn::new(Some(false), timeline1, vec![45, 42, 43, 44].into()),
1806                ),
1807                (
1808                    *timeline2.name(),
1809                    TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001, 1002].into()),
1810                ),
1811            ];
1812            let expected_components = [(
1813                MyPoints::descriptor_points(),
1814                arrays_to_list_array_opt(&[&*points1, &*points2, &*points3, &*points4].map(Some))
1815                    .unwrap(),
1816            )];
1817            let expected_chunk = Chunk::from_native_row_ids(
1818                chunks[0].id,
1819                entity_path1.clone(),
1820                None,
1821                &expected_row_ids,
1822                expected_timelines.into_iter().collect(),
1823                expected_components.into_iter().collect(),
1824            )?;
1825
1826            eprintln!("Expected:\n{expected_chunk}");
1827            eprintln!("Got:\n{}", chunks[0]);
1828            assert_eq!(expected_chunk, chunks[0]);
1829        }
1830
1831        Ok(())
1832    }
1833
1834    /// If one or more of the timelines end up unsorted, and the batch is above the unsorted length
1835    /// threshold, we split it.
1836    #[test]
1837    fn unsorted_timeline_above_threshold() -> anyhow::Result<()> {
1838        let batcher = ChunkBatcher::new(
1839            ChunkBatcherConfig {
1840                chunk_max_rows_if_unsorted: 3,
1841                ..ChunkBatcherConfig::NEVER
1842            },
1843            BatcherHooks::NONE,
1844        )?;
1845
1846        let timeline1 = Timeline::new_duration("log_time");
1847        let timeline2 = Timeline::new_duration("frame_nr");
1848
1849        let timepoint1 = TimePoint::default()
1850            .with(timeline2, 1000)
1851            .with(timeline1, 42);
1852        let timepoint2 = TimePoint::default()
1853            .with(timeline2, 1001)
1854            .with(timeline1, 43);
1855        let timepoint3 = TimePoint::default()
1856            .with(timeline2, 1002)
1857            .with(timeline1, 44);
1858        let timepoint4 = TimePoint::default()
1859            .with(timeline2, 1003)
1860            .with(timeline1, 45);
1861
1862        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1863        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1864        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1865        let points4 =
1866            MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1867
1868        let components1 = [(MyPoints::descriptor_points(), points1.clone())];
1869        let components2 = [(MyPoints::descriptor_points(), points2.clone())];
1870        let components3 = [(MyPoints::descriptor_points(), points3.clone())];
1871        let components4 = [(MyPoints::descriptor_points(), points4.clone())];
1872
1873        let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect());
1874        let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect());
1875        let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect());
1876        let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect());
1877
1878        let entity_path1: EntityPath = "a/b/c".into();
1879        batcher.push_row(entity_path1.clone(), row1.clone());
1880        batcher.push_row(entity_path1.clone(), row2.clone());
1881        batcher.push_row(entity_path1.clone(), row3.clone());
1882        batcher.push_row(entity_path1.clone(), row4.clone());
1883
1884        let chunks_rx = batcher.chunks();
1885        drop(batcher); // flush and close
1886
1887        let mut chunks = Vec::new();
1888        loop {
1889            let chunk = match chunks_rx.try_recv() {
1890                Ok(chunk) => chunk,
1891                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1892                Err(TryRecvError::Disconnected) => break,
1893            };
1894            chunks.push(chunk);
1895        }
1896
1897        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1898
1899        // Make the programmer's life easier if this test fails.
1900        eprintln!("Chunks:");
1901        for chunk in &chunks {
1902            eprintln!("{chunk}");
1903        }
1904
1905        assert_eq!(2, chunks.len());
1906
1907        {
1908            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1909            let expected_timelines = [
1910                (
1911                    *timeline1.name(),
1912                    TimeColumn::new(Some(false), timeline1, vec![45, 42, 43].into()),
1913                ),
1914                (
1915                    *timeline2.name(),
1916                    TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001].into()),
1917                ),
1918            ];
1919            let expected_components = [(
1920                MyPoints::descriptor_points(),
1921                arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1922            )];
1923            let expected_chunk = Chunk::from_native_row_ids(
1924                chunks[0].id,
1925                entity_path1.clone(),
1926                None,
1927                &expected_row_ids,
1928                expected_timelines.into_iter().collect(),
1929                expected_components.into_iter().collect(),
1930            )?;
1931
1932            eprintln!("Expected:\n{expected_chunk}");
1933            eprintln!("Got:\n{}", chunks[0]);
1934            assert_eq!(expected_chunk, chunks[0]);
1935        }
1936
1937        {
1938            let expected_row_ids = vec![row4.row_id];
1939            let expected_timelines = [
1940                (
1941                    *timeline1.name(),
1942                    TimeColumn::new(Some(true), timeline1, vec![44].into()),
1943                ),
1944                (
1945                    *timeline2.name(),
1946                    TimeColumn::new(Some(true), timeline2, vec![1002].into()),
1947                ),
1948            ];
1949            let expected_components = [(
1950                MyPoints::descriptor_points(),
1951                arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(),
1952            )];
1953            let expected_chunk = Chunk::from_native_row_ids(
1954                chunks[1].id,
1955                entity_path1.clone(),
1956                None,
1957                &expected_row_ids,
1958                expected_timelines.into_iter().collect(),
1959                expected_components.into_iter().collect(),
1960            )?;
1961
1962            eprintln!("Expected:\n{expected_chunk}");
1963            eprintln!("Got:\n{}", chunks[1]);
1964            assert_eq!(expected_chunk, chunks[1]);
1965        }
1966
1967        Ok(())
1968    }
1969}