Skip to main content

re_chunk/
batcher.rs

1use std::hash::{Hash as _, Hasher as _};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
6use nohash_hasher::IntMap;
7use re_arrow_util::arrays_to_list_array_opt;
8use re_byte_size::SizeBytes as _;
9use re_log_types::{AbsoluteTimeRange, EntityPath, TimeInt, TimePoint, Timeline, TimelineName};
10use re_quota_channel::{Receiver, Sender};
11use re_types_core::{ComponentIdentifier, SerializedComponentBatch, SerializedComponentColumn};
12
13use crate::chunk::ChunkComponents;
14use crate::{Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
15
16// ---
17
18/// An error that can occur when flushing.
19#[derive(Debug, thiserror::Error)]
20pub enum BatcherFlushError {
21    #[error("Batcher stopped before flushing completed")]
22    Closed,
23
24    #[error("Batcher flush timed out - not all messages were sent.")]
25    Timeout,
26}
27
28/// Errors that can occur when creating/manipulating a [`ChunkBatcher`].
29#[derive(thiserror::Error, Debug)]
30pub enum ChunkBatcherError {
31    /// Error when parsing configuration from environment.
32    #[error("Failed to parse config: '{name}={value}': {err}")]
33    ParseConfig {
34        name: &'static str,
35        value: String,
36        err: Box<dyn std::error::Error + Send + Sync>,
37    },
38
39    /// Error spawning one of the background threads.
40    #[error("Failed to spawn background thread '{name}': {err}")]
41    SpawnThread {
42        name: &'static str,
43        err: Box<dyn std::error::Error + Send + Sync>,
44    },
45}
46
47pub type ChunkBatcherResult<T> = Result<T, ChunkBatcherError>;
48
49/// Callbacks you can install on the [`ChunkBatcher`].
50#[derive(Clone, Default)]
51pub struct BatcherHooks {
52    /// Called when a new row arrives.
53    ///
54    /// The callback is given the slice of all rows not yet batched,
55    /// including the new one.
56    ///
57    /// Used for testing.
58    #[expect(clippy::type_complexity)]
59    pub on_insert: Option<Arc<dyn Fn(&[PendingRow]) + Send + Sync>>,
60
61    /// Called when the batcher's configuration changes.
62    ///
63    /// Called for initial configuration as well as subsequent changes.
64    /// Used for testing.
65    #[expect(clippy::type_complexity)]
66    pub on_config_change: Option<Arc<dyn Fn(&ChunkBatcherConfig) + Send + Sync>>,
67
68    /// Callback to be run when an Arrow Chunk goes out of scope.
69    ///
70    /// See [`re_log_types::ArrowRecordBatchReleaseCallback`] for more information.
71    //
72    // TODO(#6412): probably don't need this anymore.
73    pub on_release: Option<re_log_types::ArrowRecordBatchReleaseCallback>,
74}
75
76impl BatcherHooks {
77    pub const NONE: Self = Self {
78        on_insert: None,
79        on_config_change: None,
80        on_release: None,
81    };
82}
83
84impl PartialEq for BatcherHooks {
85    fn eq(&self, other: &Self) -> bool {
86        let Self {
87            on_insert,
88            on_config_change,
89            on_release,
90        } = self;
91
92        let on_insert_eq = match (on_insert, &other.on_insert) {
93            (Some(a), Some(b)) => Arc::ptr_eq(a, b),
94            (None, None) => true,
95            _ => false,
96        };
97
98        let on_config_change_eq = match (on_config_change, &other.on_config_change) {
99            (Some(a), Some(b)) => Arc::ptr_eq(a, b),
100            (None, None) => true,
101            _ => false,
102        };
103
104        on_insert_eq && on_config_change_eq && on_release == &other.on_release
105    }
106}
107
108impl std::fmt::Debug for BatcherHooks {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        let Self {
111            on_insert,
112            on_config_change,
113            on_release,
114        } = self;
115        f.debug_struct("BatcherHooks")
116            .field("on_insert", &on_insert.as_ref().map(|_| "…"))
117            .field("on_config_change", &on_config_change.as_ref().map(|_| "…"))
118            .field("on_release", &on_release)
119            .finish()
120    }
121}
122
123// ---
124
125/// Defines the different thresholds of the associated [`ChunkBatcher`].
126///
127/// See [`Self::default`] and [`Self::from_env`].
128#[derive(Clone, Copy, Debug, PartialEq, Eq)]
129pub struct ChunkBatcherConfig {
130    /// Duration of the periodic tick.
131    //
132    // NOTE: We use `std::time` directly because this library has to deal with `crossbeam` as well
133    // as std threads, which both expect standard types anyway.
134    //
135    // TODO(cmc): Add support for burst debouncing.
136    pub flush_tick: Duration,
137
138    /// Flush if the accumulated payload has a size in bytes equal or greater than this.
139    ///
140    /// The resulting [`Chunk`] might be larger than `flush_num_bytes`!
141    pub flush_num_bytes: u64,
142
143    /// Flush if the accumulated payload has a number of rows equal or greater than this.
144    pub flush_num_rows: u64,
145
146    /// Split a chunk if it contains >= rows than this threshold and one or more of its timelines are
147    /// unsorted.
148    pub chunk_max_rows_if_unsorted: u64,
149
150    /// The maximum number of bytes allowed to be in a queue/channel
151    /// before we apply backpressure.
152    ///
153    /// This is divided in two by the input and output channels.
154    ///
155    /// If a single chunk exceeds this size it will still be processed.
156    pub max_bytes_in_flight: u64,
157}
158
159impl Default for ChunkBatcherConfig {
160    fn default() -> Self {
161        Self::DEFAULT
162    }
163}
164
165impl ChunkBatcherConfig {
166    /// Default configuration, applicable to most use cases.
167    pub const DEFAULT: Self = Self {
168        flush_tick: Duration::from_millis(200),
169        flush_num_bytes: 1024 * 1024, // 1 MiB
170        flush_num_rows: u64::MAX,
171        chunk_max_rows_if_unsorted: 256,
172        max_bytes_in_flight: 100 * 1024 * 1024, // Apply backpressure
173    };
174
175    /// Low-latency configuration, preferred when streaming directly to a viewer.
176    pub const LOW_LATENCY: Self = Self {
177        flush_tick: Duration::from_millis(8), // We want it fast enough for 60 Hz for real time camera feel
178        ..Self::DEFAULT
179    };
180
181    /// Always flushes ASAP.
182    pub const ALWAYS: Self = Self {
183        flush_tick: Duration::MAX,
184        flush_num_bytes: 0,
185        flush_num_rows: 0,
186        chunk_max_rows_if_unsorted: 256,
187        ..Self::DEFAULT
188    };
189
190    /// Never flushes unless manually told to (or hitting one the builtin invariants).
191    pub const NEVER: Self = Self {
192        flush_tick: Duration::MAX,
193        flush_num_bytes: u64::MAX,
194        flush_num_rows: u64::MAX,
195        chunk_max_rows_if_unsorted: 256,
196        ..Self::DEFAULT
197    };
198
199    /// Environment variable to configure [`Self::flush_tick`].
200    pub const ENV_FLUSH_TICK: &'static str = "RERUN_FLUSH_TICK_SECS";
201
202    /// Environment variable to configure [`Self::flush_num_bytes`].
203    pub const ENV_FLUSH_NUM_BYTES: &'static str = "RERUN_FLUSH_NUM_BYTES";
204
205    /// Environment variable to configure [`Self::flush_num_rows`].
206    pub const ENV_FLUSH_NUM_ROWS: &'static str = "RERUN_FLUSH_NUM_ROWS";
207
208    /// Environment variable to configure [`Self::chunk_max_rows_if_unsorted`].
209    //
210    // NOTE: Shared with the same env-var on the store side, for consistency.
211    pub const ENV_CHUNK_MAX_ROWS_IF_UNSORTED: &'static str = "RERUN_CHUNK_MAX_ROWS_IF_UNSORTED";
212
213    /// Environment variable to configure [`Self::chunk_max_rows_if_unsorted`].
214    #[deprecated(note = "use `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` instead")]
215    const ENV_MAX_CHUNK_ROWS_IF_UNSORTED: &'static str = "RERUN_MAX_CHUNK_ROWS_IF_UNSORTED";
216
217    /// Creates a new `ChunkBatcherConfig` using the default values, optionally overridden
218    /// through the environment.
219    ///
220    /// See [`Self::apply_env`].
221    #[inline]
222    pub fn from_env() -> ChunkBatcherResult<Self> {
223        Self::default().apply_env()
224    }
225
226    /// Returns a copy of `self`, overriding existing fields with values from the environment if
227    /// they are present.
228    ///
229    /// See [`Self::ENV_FLUSH_TICK`], [`Self::ENV_FLUSH_NUM_BYTES`], [`Self::ENV_FLUSH_NUM_BYTES`].
230    pub fn apply_env(&self) -> ChunkBatcherResult<Self> {
231        let mut new = *self;
232
233        if let Ok(s) = std::env::var(Self::ENV_FLUSH_TICK) {
234            let flush_duration_secs: f64 =
235                s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
236                    name: Self::ENV_FLUSH_TICK,
237                    value: s.clone(),
238                    err: Box::new(err),
239                })?;
240
241            new.flush_tick = Duration::try_from_secs_f64(flush_duration_secs).map_err(|err| {
242                ChunkBatcherError::ParseConfig {
243                    name: Self::ENV_FLUSH_TICK,
244                    value: s.clone(),
245                    err: Box::new(err),
246                }
247            })?;
248        }
249
250        if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) {
251            if let Some(num_bytes) = re_format::parse_bytes(&s) {
252                // e.g. "10MB"
253                new.flush_num_bytes = num_bytes.unsigned_abs();
254            } else {
255                // Assume it's just an integer
256                new.flush_num_bytes = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
257                    name: Self::ENV_FLUSH_NUM_BYTES,
258                    value: s.clone(),
259                    err: Box::new(err),
260                })?;
261            }
262        }
263
264        if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_ROWS) {
265            new.flush_num_rows = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
266                name: Self::ENV_FLUSH_NUM_ROWS,
267                value: s.clone(),
268                err: Box::new(err),
269            })?;
270        }
271
272        if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED) {
273            new.chunk_max_rows_if_unsorted =
274                s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
275                    name: Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED,
276                    value: s.clone(),
277                    err: Box::new(err),
278                })?;
279        }
280
281        // Deprecated
282        #[expect(deprecated)]
283        if let Ok(s) = std::env::var(Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED) {
284            new.chunk_max_rows_if_unsorted =
285                s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
286                    name: Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED,
287                    value: s.clone(),
288                    err: Box::new(err),
289                })?;
290        }
291
292        Ok(new)
293    }
294}
295
296#[test]
297fn chunk_batcher_config() {
298    #![expect(unsafe_code)] // It's only a test
299
300    // Detect breaking changes in our environment variables.
301    // SAFETY: it's a test
302    unsafe {
303        std::env::set_var("RERUN_FLUSH_TICK_SECS", "0.3");
304        std::env::set_var("RERUN_FLUSH_NUM_BYTES", "42");
305        std::env::set_var("RERUN_FLUSH_NUM_ROWS", "666");
306        std::env::set_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED", "7777");
307    }
308
309    let config = ChunkBatcherConfig::from_env().unwrap();
310    let expected = ChunkBatcherConfig {
311        flush_tick: Duration::from_millis(300),
312        flush_num_bytes: 42,
313        flush_num_rows: 666,
314        chunk_max_rows_if_unsorted: 7777,
315        ..Default::default()
316    };
317    assert_eq!(expected, config);
318
319    // SAFETY: it's a test
320    unsafe {
321        std::env::set_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED", "9999");
322    }
323
324    let config = ChunkBatcherConfig::from_env().unwrap();
325    let expected = ChunkBatcherConfig {
326        flush_tick: Duration::from_millis(300),
327        flush_num_bytes: 42,
328        flush_num_rows: 666,
329        chunk_max_rows_if_unsorted: 9999,
330        ..Default::default()
331    };
332    assert_eq!(expected, config);
333}
334
335// ---
336
337/// Implements an asynchronous batcher that coalesces [`PendingRow`]s into [`Chunk`]s based upon
338/// the thresholds defined in the associated [`ChunkBatcherConfig`].
339///
340/// ## Batching vs. splitting
341///
342/// The batching process is triggered solely by time and space thresholds -- whichever is hit first.
343/// This process will result in one big dataframe.
344///
345/// The splitting process will then run on top of that big dataframe, and split it further down
346/// into smaller [`Chunk`]s.
347/// Specifically, the dataframe will be splits into enough [`Chunk`]s so as to guarantee that:
348/// * no chunk contains data for more than one entity path
349/// * no chunk contains rows with different sets of timelines
350/// * no chunk uses more than one datatype for a given component
351/// * no chunk contains more rows than a pre-configured threshold if one or more timelines are unsorted
352///
353/// ## Multithreading and ordering
354///
355/// [`ChunkBatcher`] can be cheaply clone and used freely across any number of threads.
356///
357/// Internally, all operations are linearized into a pipeline:
358/// - All operations sent by a given thread will take effect in the same exact order as that
359///   thread originally sent them in, from its point of view.
360/// - There isn't any well defined global order across multiple threads.
361///
362/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all
363/// previous data sent by the calling thread has been batched and sent down the channel returned
364/// by [`ChunkBatcher::chunks`]; no more, no less.
365///
366/// ## Shutdown
367///
368/// The batcher can only be shutdown by dropping all instances of it, at which point it will
369/// automatically take care of flushing any pending data that might remain in the pipeline.
370///
371/// Shutting down cannot ever block.
372#[derive(Clone)]
373pub struct ChunkBatcher {
374    inner: Arc<ChunkBatcherInner>,
375}
376
377// NOTE: The receiving end of the command stream as well as the sending end of the chunk stream are
378// owned solely by the batching thread.
379struct ChunkBatcherInner {
380    /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed,
381    /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's
382    /// running.
383    tx_cmds: Sender<Command>,
384    // NOTE: Option so we can make shutdown non-blocking even with bounded channels.
385    rx_chunks: Option<Receiver<Chunk>>,
386    cmds_to_chunks_handle: Option<std::thread::JoinHandle<()>>,
387}
388
389impl Drop for ChunkBatcherInner {
390    fn drop(&mut self) {
391        // Drop the receiving end of the chunk stream first and foremost, so that we don't block
392        // even if the output channel is bounded and currently full.
393        if let Some(rx_chunks) = self.rx_chunks.take()
394            && !rx_chunks.is_empty()
395        {
396            re_log::warn!("Dropping data");
397        }
398
399        // NOTE: The command channel is private, if we're here, nothing is currently capable of
400        // sending data down the pipeline.
401        self.tx_cmds.send(Command::Shutdown).ok();
402        if let Some(handle) = self.cmds_to_chunks_handle.take() {
403            handle.join().ok();
404        }
405    }
406}
407
408enum Command {
409    AppendChunk(Chunk),
410    AppendRow(EntityPath, PendingRow),
411    Flush {
412        on_done: crossbeam::channel::Sender<()>,
413    },
414    UpdateConfig(ChunkBatcherConfig),
415    Shutdown,
416}
417
418impl re_byte_size::SizeBytes for Command {
419    fn heap_size_bytes(&self) -> u64 {
420        match self {
421            Self::AppendChunk(chunk) => chunk.heap_size_bytes(),
422            Self::AppendRow(_, row) => row.heap_size_bytes(),
423            Self::Flush { .. } | Self::UpdateConfig(_) | Self::Shutdown => 0,
424        }
425    }
426}
427
428impl Command {
429    fn flush() -> (Self, crossbeam::channel::Receiver<()>) {
430        let (tx, rx) = crossbeam::channel::bounded(1); // oneshot
431        (Self::Flush { on_done: tx }, rx)
432    }
433}
434
435impl ChunkBatcher {
436    /// Creates a new [`ChunkBatcher`] using the passed in `config`.
437    ///
438    /// The returned object must be kept in scope: dropping it will trigger a clean shutdown of the
439    /// batcher.
440    #[must_use = "Batching threads will automatically shutdown when this object is dropped"]
441    pub fn new(config: ChunkBatcherConfig, hooks: BatcherHooks) -> ChunkBatcherResult<Self> {
442        let (tx_cmds, rx_cmd) =
443            re_quota_channel::channel("batcher_input", config.max_bytes_in_flight / 2);
444        let (tx_chunk, rx_chunks) =
445            re_quota_channel::channel("batcher_output", config.max_bytes_in_flight / 2);
446
447        let cmds_to_chunks_handle = {
448            const NAME: &str = "ChunkBatcher::cmds_to_chunks";
449            std::thread::Builder::new()
450                .name(NAME.into())
451                .spawn(move || batching_thread(config, hooks, rx_cmd, tx_chunk))
452                .map_err(|err| ChunkBatcherError::SpawnThread {
453                    name: NAME,
454                    err: Box::new(err),
455                })?
456        };
457
458        re_log::debug!(?config, "creating new chunk batcher");
459
460        let inner = ChunkBatcherInner {
461            tx_cmds,
462            rx_chunks: Some(rx_chunks),
463            cmds_to_chunks_handle: Some(cmds_to_chunks_handle),
464        };
465
466        Ok(Self {
467            inner: Arc::new(inner),
468        })
469    }
470
471    // --- Send commands ---
472
473    pub fn push_chunk(&self, chunk: Chunk) {
474        self.inner.push_chunk(chunk);
475    }
476
477    /// Pushes a [`PendingRow`] down the batching pipeline.
478    ///
479    /// This will computea the size of the row from the batching thread!
480    ///
481    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
482    #[inline]
483    pub fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
484        self.inner.push_row(entity_path, row);
485    }
486
487    /// Initiates a flush of the pipeline and returns immediately.
488    ///
489    /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
490    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
491    #[inline]
492    pub fn flush_async(&self) {
493        self.inner.flush_async();
494    }
495
496    /// Initiates a flush the batching pipeline and waits for it to propagate.
497    ///
498    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
499    #[inline]
500    pub fn flush_blocking(&self, timeout: Duration) -> Result<(), BatcherFlushError> {
501        self.inner.flush_blocking(timeout)
502    }
503
504    /// Updates the batcher's configuration as far as possible.
505    pub fn update_config(&self, config: ChunkBatcherConfig) {
506        self.inner.update_config(config);
507    }
508
509    // --- Subscribe to chunks ---
510
511    /// Returns a _shared_ channel in which are sent the batched [`Chunk`]s.
512    ///
513    /// Shutting down the batcher will close this channel.
514    ///
515    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
516    pub fn chunks(&self) -> Receiver<Chunk> {
517        // NOTE: `rx_chunks` is only ever taken when the batcher as a whole is dropped, at which
518        // point it is impossible to call this method.
519        #[expect(clippy::unwrap_used)]
520        self.inner.rx_chunks.clone().unwrap()
521    }
522}
523
524impl ChunkBatcherInner {
525    fn push_chunk(&self, chunk: Chunk) {
526        self.send_cmd(Command::AppendChunk(chunk));
527    }
528
529    fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
530        self.send_cmd(Command::AppendRow(entity_path, row));
531    }
532
533    fn flush_async(&self) {
534        let (flush_cmd, _) = Command::flush();
535        self.send_cmd(flush_cmd);
536    }
537
538    fn flush_blocking(&self, timeout: Duration) -> Result<(), BatcherFlushError> {
539        use crossbeam::channel::RecvTimeoutError;
540
541        let (flush_cmd, on_done) = Command::flush();
542        self.send_cmd(flush_cmd);
543
544        on_done.recv_timeout(timeout).map_err(|err| match err {
545            RecvTimeoutError::Timeout => BatcherFlushError::Timeout,
546            RecvTimeoutError::Disconnected => BatcherFlushError::Closed,
547        })
548    }
549
550    fn update_config(&self, config: ChunkBatcherConfig) {
551        self.send_cmd(Command::UpdateConfig(config));
552    }
553
554    fn send_cmd(&self, cmd: Command) {
555        // NOTE: Internal channels can never be closed outside of the `Drop` impl, this cannot
556        // fail.
557        self.tx_cmds.send(cmd).ok();
558    }
559}
560
561#[expect(clippy::needless_pass_by_value)]
562fn batching_thread(
563    mut config: ChunkBatcherConfig,
564    hooks: BatcherHooks,
565    rx_cmd: Receiver<Command>,
566    tx_chunk: Sender<Chunk>,
567) {
568    let mut rx_tick = crossbeam::channel::tick(config.flush_tick);
569
570    struct Accumulator {
571        latest: Instant,
572        entity_path: EntityPath,
573        pending_rows: Vec<PendingRow>,
574        pending_num_bytes: u64,
575    }
576
577    impl Accumulator {
578        fn new(entity_path: EntityPath) -> Self {
579            Self {
580                entity_path,
581                latest: Instant::now(),
582                pending_rows: Default::default(),
583                pending_num_bytes: Default::default(),
584            }
585        }
586
587        fn reset(&mut self) {
588            self.latest = Instant::now();
589            self.pending_rows.clear();
590            self.pending_num_bytes = 0;
591        }
592    }
593
594    let mut accs: IntMap<EntityPath, Accumulator> = IntMap::default();
595
596    fn do_push_row(acc: &mut Accumulator, row: PendingRow) {
597        acc.pending_num_bytes += row.total_size_bytes();
598        acc.pending_rows.push(row);
599    }
600
601    fn do_flush_all(
602        acc: &mut Accumulator,
603        tx_chunk: &Sender<Chunk>,
604        reason: &str,
605        chunk_max_rows_if_unsorted: u64,
606    ) {
607        let rows = std::mem::take(&mut acc.pending_rows);
608        if rows.is_empty() {
609            return;
610        }
611
612        re_log::trace!(
613            "Flushing {} rows and {} bytes. Reason: {reason}",
614            rows.len(),
615            re_format::format_bytes(acc.pending_num_bytes as _)
616        );
617
618        let chunks =
619            PendingRow::many_into_chunks(acc.entity_path.clone(), chunk_max_rows_if_unsorted, rows);
620        for chunk in chunks {
621            let chunk = match chunk {
622                Ok(chunk) => chunk,
623                Err(err) => {
624                    re_log::error!(%err, "corrupt chunk detected, dropping");
625                    continue;
626                }
627            };
628
629            // NOTE: This can only fail if all receivers have been dropped, which simply cannot happen
630            // as long the batching thread is alive… which is where we currently are.
631
632            if !chunk.components.is_empty() {
633                // make sure the chunk didn't contain *only* indicators!
634                tx_chunk.send(chunk).ok();
635            } else {
636                re_log::warn_once!(
637                    "Dropping chunk without components. Entity path: {}",
638                    chunk.entity_path()
639                );
640            }
641        }
642
643        acc.reset();
644    }
645
646    re_log::trace!(
647        "Flushing every: {:.2}s, {} rows, {}",
648        config.flush_tick.as_secs_f64(),
649        config.flush_num_rows,
650        re_format::format_bytes(config.flush_num_bytes as _),
651    );
652    // Signal initial config
653    if let Some(on_config_change) = hooks.on_config_change.as_ref() {
654        on_config_change(&config);
655    }
656
657    // Set to `true` when a flush is triggered for a reason other than hitting the time threshold,
658    // so that the next tick will not unnecessarily fire early.
659    let mut skip_next_tick = false;
660
661    loop {
662        crossbeam::select! {
663            recv(rx_cmd.inner()) -> cmd => {
664                let Ok(cmd) = cmd else {
665                    // All command senders are gone, which can only happen if the
666                    // `ChunkBatcher` itself has been dropped.
667                    break;
668                };
669
670                let re_quota_channel::SizedMessage { msg, size_bytes } = cmd;
671
672                rx_cmd.manual_on_receive(size_bytes);
673
674                match msg {
675                    Command::AppendChunk(chunk) => {
676                        // NOTE: This can only fail if all receivers have been dropped, which simply cannot happen
677                        // as long the batching thread is alive… which is where we currently are.
678
679                        if !chunk.components.is_empty() {
680                            // make sure the chunk didn't contain *only* indicators!
681                            tx_chunk.send(chunk).ok();
682                        } else {
683                            re_log::warn_once!(
684                                "Dropping chunk without components. Entity path: {}",
685                                chunk.entity_path()
686                            );
687                        }
688                    },
689                    Command::AppendRow(entity_path, row) => {
690                        let acc = accs.entry(entity_path.clone())
691                            .or_insert_with(|| Accumulator::new(entity_path));
692                        do_push_row(acc, row);
693
694                        if let Some(config) = hooks.on_insert.as_ref() {
695                            config(&acc.pending_rows);
696                        }
697
698                        if acc.pending_rows.len() as u64 >= config.flush_num_rows {
699                            do_flush_all(acc, &tx_chunk, "rows", config.chunk_max_rows_if_unsorted);
700                            skip_next_tick = true;
701                        } else if acc.pending_num_bytes >= config.flush_num_bytes {
702                            do_flush_all(acc, &tx_chunk, "bytes", config.chunk_max_rows_if_unsorted);
703                            skip_next_tick = true;
704                        }
705                    },
706
707                    Command::Flush{ on_done } => {
708                        skip_next_tick = true;
709                        for acc in accs.values_mut() {
710                            do_flush_all(acc, &tx_chunk, "manual", config.chunk_max_rows_if_unsorted);
711                        }
712                        on_done.send(()).ok();
713                    },
714
715                    Command::UpdateConfig(new_config) => {
716                        // Warn if properties changed that we can't change here.
717                        if config.max_bytes_in_flight != new_config.max_bytes_in_flight {
718                            re_log::warn!("Cannot change max_bytes_in_flight after batcher has been created. \
719                                Previous max_bytes_in_flight: {:?}, new max_bytes_in_flight: {:?}",
720                                config.max_bytes_in_flight, new_config.max_bytes_in_flight);
721                        }
722
723                        re_log::trace!("Updated batcher config: {:?}", new_config);
724                        if let Some(on_config_change) = hooks.on_config_change.as_ref() {
725                            on_config_change(&new_config);
726                        }
727
728                        config = new_config;
729                        rx_tick = crossbeam::channel::tick(config.flush_tick);
730                    }
731
732                    Command::Shutdown => break,
733                }
734            },
735
736            recv(rx_tick) -> _ => {
737                if skip_next_tick {
738                    skip_next_tick = false;
739                } else {
740                    // TODO(cmc): It would probably be better to have a ticker per entity path. Maybe. At some point.
741                    for acc in accs.values_mut() {
742                        do_flush_all(acc, &tx_chunk, "tick", config.chunk_max_rows_if_unsorted);
743                    }
744                }
745            },
746        };
747    }
748
749    drop(rx_cmd);
750    for acc in accs.values_mut() {
751        do_flush_all(
752            acc,
753            &tx_chunk,
754            "shutdown",
755            config.chunk_max_rows_if_unsorted,
756        );
757    }
758    drop(tx_chunk);
759
760    // NOTE: The receiving end of the command stream as well as the sending end of the chunk
761    // stream are owned solely by this thread.
762    // Past this point, all command writes and all chunk reads will return `ErrDisconnected`.
763}
764
765// ---
766
767/// A single row's worth of data (i.e. a single log call).
768///
769/// Send those to the batcher to build up a [`Chunk`].
770#[derive(Debug, Clone)]
771pub struct PendingRow {
772    /// Auto-generated `TUID`, uniquely identifying this event and keeping track of the client's
773    /// wall-clock.
774    pub row_id: RowId,
775
776    /// User-specified [`TimePoint`] for this event.
777    pub timepoint: TimePoint,
778
779    /// The component data.
780    ///
781    /// Each array is a single component, i.e. _not_ a list array.
782    pub components: IntMap<ComponentIdentifier, SerializedComponentBatch>,
783}
784
785impl PendingRow {
786    #[inline]
787    pub fn new(
788        timepoint: TimePoint,
789        components: IntMap<ComponentIdentifier, SerializedComponentBatch>,
790    ) -> Self {
791        Self {
792            row_id: RowId::new(),
793            timepoint,
794            components,
795        }
796    }
797
798    #[inline]
799    pub fn from_iter(
800        timepoint: TimePoint,
801        components: impl IntoIterator<Item = SerializedComponentBatch>,
802    ) -> Self {
803        Self::new(
804            timepoint,
805            components
806                .into_iter()
807                .map(|component| (component.descriptor.component, component))
808                .collect(),
809        )
810    }
811}
812
813impl re_byte_size::SizeBytes for PendingRow {
814    #[inline]
815    fn heap_size_bytes(&self) -> u64 {
816        let Self {
817            row_id,
818            timepoint,
819            components,
820        } = self;
821
822        row_id.heap_size_bytes() + timepoint.heap_size_bytes() + components.heap_size_bytes()
823    }
824}
825
826impl PendingRow {
827    /// Turn a single row into a [`Chunk`] of its own.
828    ///
829    /// That's very wasteful, probably don't do that outside of testing, or unless you have very
830    /// good reasons too.
831    ///
832    /// See also [`Self::many_into_chunks`].
833    pub fn into_chunk(self, entity_path: EntityPath) -> ChunkResult<Chunk> {
834        let Self {
835            row_id,
836            timepoint,
837            components,
838        } = self;
839
840        let timelines = timepoint
841            .into_iter()
842            .map(|(timeline_name, cell)| {
843                let times = ArrowScalarBuffer::from(vec![cell.as_i64()]);
844                let time_column =
845                    TimeColumn::new(Some(true), Timeline::new(timeline_name, cell.typ()), times);
846                (timeline_name, time_column)
847            })
848            .collect();
849
850        let mut per_desc = ChunkComponents::default();
851        for (_component, batch) in components {
852            let list_array = arrays_to_list_array_opt(&[Some(&*batch.array as _)]);
853            if let Some(list_array) = list_array {
854                per_desc.insert(SerializedComponentColumn::new(list_array, batch.descriptor));
855            }
856        }
857
858        Chunk::from_native_row_ids(
859            ChunkId::new(),
860            entity_path,
861            Some(true),
862            &[row_id],
863            timelines,
864            per_desc,
865        )
866    }
867
868    /// This turns a batch of [`PendingRow`]s into a [`Chunk`].
869    ///
870    /// There are a lot of conditions to fulfill for a [`Chunk`] to be valid: this helper makes
871    /// sure to fulfill all of them by splitting the chunk into one or more pieces as necessary.
872    ///
873    /// In particular, a [`Chunk`] cannot:
874    /// * contain data for more than one entity path
875    /// * contain rows with different sets of timelines
876    /// * use more than one datatype for a given component
877    /// * contain more rows than a pre-configured threshold if one or more timelines are unsorted
878    //
879    // TODO(cmc): there are lots of performance improvement opportunities in this one, but let's
880    // see if that actually matters in practice first.
881    pub fn many_into_chunks(
882        entity_path: EntityPath,
883        chunk_max_rows_if_unsorted: u64,
884        mut rows: Vec<Self>,
885    ) -> impl Iterator<Item = ChunkResult<Chunk>> {
886        re_tracing::profile_function!();
887
888        // First things first, sort all the rows by row ID -- that's our global order and it holds
889        // true no matter what.
890        {
891            re_tracing::profile_scope!("sort rows");
892            rows.sort_by_key(|row| row.row_id);
893        }
894
895        // Then organize the rows in micro batches -- one batch per unique set of timelines.
896        let mut per_timeline_set: IntMap<u64 /* Timeline set */, Vec<Self>> = Default::default();
897        {
898            re_tracing::profile_scope!("compute timeline sets");
899
900            // The hash is deterministic because the traversal of a `TimePoint` is itself
901            // deterministic: `TimePoint` is backed by a `BTreeMap`.
902            for row in rows {
903                let mut hasher = ahash::AHasher::default();
904                row.timepoint.timeline_names().for_each(|timeline| {
905                    <TimelineName as std::hash::Hash>::hash(timeline, &mut hasher);
906                });
907
908                per_timeline_set
909                    .entry(hasher.finish())
910                    .or_default()
911                    .push(row);
912            }
913        }
914
915        per_timeline_set.into_values().flat_map(move |rows| {
916            re_tracing::profile_scope!("iterate per timeline set");
917
918            // Then we split the micro batches even further -- one sub-batch per unique set of datatypes.
919            let mut per_datatype_set: IntMap<u64 /* ArrowDatatype set */, Vec<Self>> =
920                Default::default();
921            {
922                re_tracing::profile_scope!("compute datatype sets");
923
924                // The hash is dependent on the order in which the `PendingRow` was created (i.e.
925                // the order in which its components were inserted).
926                //
927                // This is because the components are stored in a `IntMap`, which doesn't do any
928                // hashing. For that reason, the traversal order of a `IntMap` in deterministic:
929                // it's always the same for `IntMap` that share the same keys, as long as these
930                // keys were inserted in the same order.
931                // See `intmap_order_is_deterministic` in the tests below.
932                //
933                // In practice, the `PendingRow`s in a given program are always built in the same
934                // order for the duration of that program, which is why this works.
935                // See `simple_but_hashes_wont_match` in the tests below.
936                for row in rows {
937                    let mut hasher = ahash::AHasher::default();
938                    row.components
939                        .values()
940                        .for_each(|batch| batch.array.data_type().hash(&mut hasher));
941                    per_datatype_set
942                        .entry(hasher.finish())
943                        .or_default()
944                        .push(row);
945                }
946            }
947
948            // And finally we can build the resulting chunks.
949            let entity_path = entity_path.clone();
950            per_datatype_set.into_values().flat_map(move |rows| {
951                re_tracing::profile_scope!("iterate per datatype set");
952
953                let mut row_ids: Vec<RowId> = Vec::with_capacity(rows.len());
954                let mut timelines: IntMap<TimelineName, PendingTimeColumn> = IntMap::default();
955
956                // Create all the logical list arrays that we're going to need, accounting for the
957                // possibility of sparse components in the data.
958                let mut all_components: IntMap<ComponentIdentifier, _> = IntMap::default();
959                for row in &rows {
960                    for (component, batch) in &row.components {
961                        all_components
962                            .entry(*component)
963                            .or_insert_with(|| (batch.descriptor.clone(), Vec::new()));
964                    }
965                }
966
967                let mut chunks = Vec::new();
968
969                let mut components = all_components.clone();
970                for row in &rows {
971                    let Self {
972                        row_id,
973                        timepoint: row_timepoint,
974                        components: row_components,
975                    } = row;
976
977                    // Look for unsorted timelines -- if we find any, and the chunk is larger than
978                    // the pre-configured `chunk_max_rows_if_unsorted` threshold, then split _even_
979                    // further!
980                    for (&timeline_name, cell) in row_timepoint {
981                        let time_column = timelines.entry(timeline_name).or_insert_with(|| {
982                            PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
983                        });
984
985                        if !row_ids.is_empty() // just being extra cautious
986                            && row_ids.len() as u64 >= chunk_max_rows_if_unsorted
987                            && !time_column.is_sorted
988                        {
989                            chunks.push(Chunk::from_native_row_ids(
990                                ChunkId::new(),
991                                entity_path.clone(),
992                                Some(true),
993                                &std::mem::take(&mut row_ids),
994                                std::mem::take(&mut timelines)
995                                    .into_iter()
996                                    .map(|(name, time_column)| (name, time_column.finish()))
997                                    .collect(),
998                                {
999                                    let mut per_component = ChunkComponents::default();
1000                                    for (_component, (desc, arrays)) in
1001                                        std::mem::take(&mut components)
1002                                    {
1003                                        let list_array = arrays_to_list_array_opt(&arrays);
1004                                        if let Some(list_array) = list_array {
1005                                            per_component.insert(SerializedComponentColumn::new(
1006                                                list_array, desc,
1007                                            ));
1008                                        }
1009                                    }
1010                                    per_component
1011                                },
1012                            ));
1013
1014                            components = all_components.clone();
1015                        }
1016                    }
1017
1018                    row_ids.push(*row_id);
1019
1020                    for (&timeline_name, &cell) in row_timepoint {
1021                        let time_column = timelines.entry(timeline_name).or_insert_with(|| {
1022                            PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
1023                        });
1024                        time_column.push(cell.into());
1025                    }
1026
1027                    for (component, (_desc, arrays)) in &mut components {
1028                        // NOTE: This will push `None` if the row doesn't actually hold a value for this
1029                        // component -- these are sparse list arrays!
1030                        arrays.push(
1031                            row_components
1032                                .get(component)
1033                                .map(|batch| &*batch.array as _),
1034                        );
1035                    }
1036                }
1037
1038                chunks.push(Chunk::from_native_row_ids(
1039                    ChunkId::new(),
1040                    entity_path.clone(),
1041                    Some(true),
1042                    &std::mem::take(&mut row_ids),
1043                    timelines
1044                        .into_iter()
1045                        .map(|(timeline, time_column)| (timeline, time_column.finish()))
1046                        .collect(),
1047                    {
1048                        let mut per_desc = ChunkComponents::default();
1049                        for (_component, (desc, arrays)) in components {
1050                            let list_array = arrays_to_list_array_opt(&arrays);
1051                            if let Some(list_array) = list_array {
1052                                per_desc.insert(SerializedComponentColumn::new(list_array, desc));
1053                            }
1054                        }
1055                        per_desc
1056                    },
1057                ));
1058
1059                chunks
1060            })
1061        })
1062    }
1063}
1064
1065/// Helper class used to buffer time data.
1066///
1067/// See [`PendingRow::many_into_chunks`] for usage.
1068struct PendingTimeColumn {
1069    timeline: Timeline,
1070    times: Vec<i64>,
1071    is_sorted: bool,
1072    time_range: AbsoluteTimeRange,
1073}
1074
1075impl PendingTimeColumn {
1076    fn new(timeline: Timeline) -> Self {
1077        Self {
1078            timeline,
1079            times: Default::default(),
1080            is_sorted: true,
1081            time_range: AbsoluteTimeRange::EMPTY,
1082        }
1083    }
1084
1085    /// Push a single time value at the end of this chunk.
1086    fn push(&mut self, time: TimeInt) {
1087        let Self {
1088            timeline: _,
1089            times,
1090            is_sorted,
1091            time_range,
1092        } = self;
1093
1094        *is_sorted &= times.last().copied().unwrap_or(TimeInt::MIN.as_i64()) <= time.as_i64();
1095        time_range.set_min(TimeInt::min(time_range.min(), time));
1096        time_range.set_max(TimeInt::max(time_range.max(), time));
1097        times.push(time.as_i64());
1098    }
1099
1100    fn finish(self) -> TimeColumn {
1101        let Self {
1102            timeline,
1103            times,
1104            is_sorted,
1105            time_range,
1106        } = self;
1107
1108        TimeColumn {
1109            timeline,
1110            times: ArrowScalarBuffer::from(times),
1111            is_sorted,
1112            time_range,
1113        }
1114    }
1115}
1116
1117// ---
1118
1119// NOTE:
1120// These tests only cover the chunk splitting conditions described in `many_into_chunks`.
1121// Temporal and spatial thresholds are already taken care of by the RecordingStream test suite.
1122
1123#[cfg(test)]
1124mod tests {
1125    use crossbeam::channel::TryRecvError;
1126    use re_log_types::example_components::{MyIndex, MyLabel, MyPoint, MyPoint64, MyPoints};
1127    use re_types_core::{ComponentDescriptor, Loggable as _};
1128
1129    use super::*;
1130
1131    /// A bunch of rows that don't fit any of the split conditions should end up together.
1132    #[test]
1133    fn simple() -> anyhow::Result<()> {
1134        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1135
1136        let timeline1 = Timeline::new_duration("log_time");
1137
1138        let timepoint1 = TimePoint::default().with(timeline1, 42);
1139        let timepoint2 = TimePoint::default().with(timeline1, 43);
1140        let timepoint3 = TimePoint::default().with(timeline1, 44);
1141
1142        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1143        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1144        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1145
1146        let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1147        let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1148        let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1149
1150        let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1151        let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1152        let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1153
1154        let components1 = [
1155            SerializedComponentBatch::new(points1.clone(), MyPoints::descriptor_points()),
1156            SerializedComponentBatch::new(labels1.clone(), MyPoints::descriptor_labels()),
1157            SerializedComponentBatch::new(indices1.clone(), MyIndex::partial_descriptor()),
1158        ];
1159        let components2 = [
1160            SerializedComponentBatch::new(points2.clone(), MyPoints::descriptor_points()),
1161            SerializedComponentBatch::new(labels2.clone(), MyPoints::descriptor_labels()),
1162            SerializedComponentBatch::new(indices2.clone(), MyIndex::partial_descriptor()),
1163        ];
1164        let components3 = [
1165            SerializedComponentBatch::new(points3.clone(), MyPoints::descriptor_points()),
1166            SerializedComponentBatch::new(labels3.clone(), MyPoints::descriptor_labels()),
1167            SerializedComponentBatch::new(indices3.clone(), MyIndex::partial_descriptor()),
1168        ];
1169
1170        let row1 = PendingRow::from_iter(timepoint1.clone(), components1);
1171        let row2 = PendingRow::from_iter(timepoint2.clone(), components2);
1172        let row3 = PendingRow::from_iter(timepoint3.clone(), components3);
1173
1174        let entity_path1: EntityPath = "a/b/c".into();
1175        batcher.push_row(entity_path1.clone(), row1.clone());
1176        batcher.push_row(entity_path1.clone(), row2.clone());
1177        batcher.push_row(entity_path1.clone(), row3.clone());
1178
1179        let chunks_rx = batcher.chunks();
1180        drop(batcher); // flush and close
1181
1182        let mut chunks = Vec::new();
1183        loop {
1184            let chunk = match chunks_rx.try_recv() {
1185                Ok(chunk) => chunk,
1186                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1187                Err(TryRecvError::Disconnected) => break,
1188            };
1189            chunks.push(chunk);
1190        }
1191
1192        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1193
1194        // Make the programmer's life easier if this test fails.
1195        eprintln!("Chunks:");
1196        for chunk in &chunks {
1197            eprintln!("{chunk}");
1198        }
1199
1200        assert_eq!(1, chunks.len());
1201
1202        {
1203            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1204            let expected_timelines = [(
1205                *timeline1.name(),
1206                TimeColumn::new(Some(true), timeline1, vec![42, 43, 44].into()),
1207            )];
1208            let expected_components = [
1209                (
1210                    MyPoints::descriptor_points(),
1211                    arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1212                ), //
1213                (
1214                    MyPoints::descriptor_labels(),
1215                    arrays_to_list_array_opt(&[&*labels1, &*labels2, &*labels3].map(Some)).unwrap(),
1216                ), //
1217                (
1218                    MyIndex::partial_descriptor(),
1219                    arrays_to_list_array_opt(&[&*indices1, &*indices2, &*indices3].map(Some))
1220                        .unwrap(),
1221                ), //
1222            ];
1223            let expected_chunk = Chunk::from_native_row_ids(
1224                chunks[0].id,
1225                entity_path1.clone(),
1226                None,
1227                &expected_row_ids,
1228                expected_timelines.into_iter().collect(),
1229                expected_components.into_iter().collect(),
1230            )?;
1231
1232            eprintln!("Expected:\n{expected_chunk}");
1233            eprintln!("Got:\n{}", chunks[0]);
1234            assert_eq!(expected_chunk, chunks[0]);
1235        }
1236
1237        Ok(())
1238    }
1239
1240    #[test]
1241    #[expect(clippy::len_zero)]
1242    fn simple_but_hashes_might_not_match() -> anyhow::Result<()> {
1243        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1244
1245        let timeline1 = Timeline::new_duration("log_time");
1246
1247        let timepoint1 = TimePoint::default().with(timeline1, 42);
1248        let timepoint2 = TimePoint::default().with(timeline1, 43);
1249        let timepoint3 = TimePoint::default().with(timeline1, 44);
1250
1251        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1252        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1253        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1254
1255        let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1256        let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1257        let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1258
1259        let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1260        let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1261        let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1262
1263        let components1 = [
1264            SerializedComponentBatch::new(indices1.clone(), MyIndex::partial_descriptor()),
1265            SerializedComponentBatch::new(points1.clone(), MyPoints::descriptor_points()),
1266            SerializedComponentBatch::new(labels1.clone(), MyPoints::descriptor_labels()),
1267        ];
1268        let components2 = [
1269            SerializedComponentBatch::new(points2.clone(), MyPoints::descriptor_points()),
1270            SerializedComponentBatch::new(labels2.clone(), MyPoints::descriptor_labels()),
1271            SerializedComponentBatch::new(indices2.clone(), MyIndex::partial_descriptor()),
1272        ];
1273        let components3 = [
1274            SerializedComponentBatch::new(labels3.clone(), MyPoints::descriptor_labels()),
1275            SerializedComponentBatch::new(indices3.clone(), MyIndex::partial_descriptor()),
1276            SerializedComponentBatch::new(points3.clone(), MyPoints::descriptor_points()),
1277        ];
1278
1279        let row1 = PendingRow::from_iter(timepoint1.clone(), components1);
1280        let row2 = PendingRow::from_iter(timepoint2.clone(), components2);
1281        let row3 = PendingRow::from_iter(timepoint3.clone(), components3);
1282
1283        let entity_path1: EntityPath = "a/b/c".into();
1284        batcher.push_row(entity_path1.clone(), row1.clone());
1285        batcher.push_row(entity_path1.clone(), row2.clone());
1286        batcher.push_row(entity_path1.clone(), row3.clone());
1287
1288        let chunks_rx = batcher.chunks();
1289        drop(batcher); // flush and close
1290
1291        let mut chunks = Vec::new();
1292        loop {
1293            let chunk = match chunks_rx.try_recv() {
1294                Ok(chunk) => chunk,
1295                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1296                Err(TryRecvError::Disconnected) => break,
1297            };
1298            chunks.push(chunk);
1299        }
1300
1301        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1302
1303        // Make the programmer's life easier if this test fails.
1304        eprintln!("Chunks:");
1305        for chunk in &chunks {
1306            eprintln!("{chunk}");
1307        }
1308
1309        // The rows's components were inserted in different orders, and therefore the resulting
1310        // `IntMap`s *may* have different traversal orders, which ultimately means that the datatype
1311        // hashes *may* end up being different: i.e., possibly no batching.
1312        //
1313        // In practice, it's still possible to get lucky and end up with two maps that just happen
1314        // to share the same iteration order regardless, which is why this assertion is overly broad.
1315        // Try running this test with `--show-output`.
1316        assert!(chunks.len() >= 1);
1317
1318        Ok(())
1319    }
1320
1321    #[test]
1322    #[expect(clippy::zero_sized_map_values)]
1323    fn intmap_order_is_deterministic() {
1324        let descriptors = [
1325            MyPoints::descriptor_points(),
1326            MyPoints::descriptor_colors(),
1327            MyPoints::descriptor_labels(),
1328            MyPoint64::partial_descriptor(),
1329            MyIndex::partial_descriptor(),
1330        ];
1331
1332        let expected: IntMap<ComponentDescriptor, ()> =
1333            descriptors.iter().cloned().map(|d| (d, ())).collect();
1334        let expected: Vec<_> = expected.into_keys().collect();
1335
1336        for _ in 0..1_000 {
1337            let got_collect: IntMap<ComponentDescriptor, ()> =
1338                descriptors.clone().into_iter().map(|d| (d, ())).collect();
1339            let got_collect: Vec<_> = got_collect.into_keys().collect();
1340
1341            let mut got_insert: IntMap<ComponentDescriptor, ()> = Default::default();
1342            for d in descriptors.clone() {
1343                got_insert.insert(d, ());
1344            }
1345            let got_insert: Vec<_> = got_insert.into_keys().collect();
1346
1347            assert_eq!(expected, got_collect);
1348            assert_eq!(expected, got_insert);
1349        }
1350    }
1351
1352    /// A bunch of rows that don't fit any of the split conditions should end up together.
1353    #[test]
1354    fn simple_static() -> anyhow::Result<()> {
1355        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1356
1357        let static_ = TimePoint::default();
1358
1359        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1360        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1361        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1362
1363        let components1 = [SerializedComponentBatch::new(
1364            points1.clone(),
1365            MyPoints::descriptor_points(),
1366        )];
1367        let components2 = [SerializedComponentBatch::new(
1368            points2.clone(),
1369            MyPoints::descriptor_points(),
1370        )];
1371        let components3 = [SerializedComponentBatch::new(
1372            points3.clone(),
1373            MyPoints::descriptor_points(),
1374        )];
1375
1376        let row1 = PendingRow::from_iter(static_.clone(), components1);
1377        let row2 = PendingRow::from_iter(static_.clone(), components2);
1378        let row3 = PendingRow::from_iter(static_.clone(), components3);
1379
1380        let entity_path1: EntityPath = "a/b/c".into();
1381        batcher.push_row(entity_path1.clone(), row1.clone());
1382        batcher.push_row(entity_path1.clone(), row2.clone());
1383        batcher.push_row(entity_path1.clone(), row3.clone());
1384
1385        let chunks_rx = batcher.chunks();
1386        drop(batcher); // flush and close
1387
1388        let mut chunks = Vec::new();
1389        loop {
1390            let chunk = match chunks_rx.try_recv() {
1391                Ok(chunk) => chunk,
1392                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1393                Err(TryRecvError::Disconnected) => break,
1394            };
1395            chunks.push(chunk);
1396        }
1397
1398        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1399
1400        // Make the programmer's life easier if this test fails.
1401        eprintln!("Chunks:");
1402        for chunk in &chunks {
1403            eprintln!("{chunk}");
1404        }
1405
1406        assert_eq!(1, chunks.len());
1407
1408        {
1409            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1410            let expected_timelines = [];
1411            let expected_components = [(
1412                MyPoints::descriptor_points(),
1413                arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1414            )];
1415            let expected_chunk = Chunk::from_native_row_ids(
1416                chunks[0].id,
1417                entity_path1.clone(),
1418                None,
1419                &expected_row_ids,
1420                expected_timelines.into_iter().collect(),
1421                expected_components.into_iter().collect(),
1422            )?;
1423
1424            eprintln!("Expected:\n{expected_chunk}");
1425            eprintln!("Got:\n{}", chunks[0]);
1426            assert_eq!(expected_chunk, chunks[0]);
1427        }
1428
1429        Ok(())
1430    }
1431
1432    /// A bunch of rows belonging to different entities will end up in different batches.
1433    #[test]
1434    fn different_entities() -> anyhow::Result<()> {
1435        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1436
1437        let timeline1 = Timeline::new_duration("log_time");
1438
1439        let timepoint1 = TimePoint::default().with(timeline1, 42);
1440        let timepoint2 = TimePoint::default().with(timeline1, 43);
1441        let timepoint3 = TimePoint::default().with(timeline1, 44);
1442
1443        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1444        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1445        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1446
1447        let components1 = [SerializedComponentBatch::new(
1448            points1.clone(),
1449            MyPoints::descriptor_points(),
1450        )];
1451        let components2 = [SerializedComponentBatch::new(
1452            points2.clone(),
1453            MyPoints::descriptor_points(),
1454        )];
1455        let components3 = [SerializedComponentBatch::new(
1456            points3.clone(),
1457            MyPoints::descriptor_points(),
1458        )];
1459
1460        let row1 = PendingRow::from_iter(timepoint1.clone(), components1);
1461        let row2 = PendingRow::from_iter(timepoint2.clone(), components2);
1462        let row3 = PendingRow::from_iter(timepoint3.clone(), components3);
1463
1464        let entity_path1: EntityPath = "ent1".into();
1465        let entity_path2: EntityPath = "ent2".into();
1466        batcher.push_row(entity_path1.clone(), row1.clone());
1467        batcher.push_row(entity_path2.clone(), row2.clone());
1468        batcher.push_row(entity_path1.clone(), row3.clone());
1469
1470        let chunks_rx = batcher.chunks();
1471        drop(batcher); // flush and close
1472
1473        let mut chunks = Vec::new();
1474        loop {
1475            let chunk = match chunks_rx.try_recv() {
1476                Ok(chunk) => chunk,
1477                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1478                Err(TryRecvError::Disconnected) => break,
1479            };
1480            chunks.push(chunk);
1481        }
1482
1483        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1484
1485        // Make the programmer's life easier if this test fails.
1486        eprintln!("Chunks:");
1487        for chunk in &chunks {
1488            eprintln!("{chunk}");
1489        }
1490
1491        assert_eq!(2, chunks.len());
1492
1493        {
1494            let expected_row_ids = vec![row1.row_id, row3.row_id];
1495            let expected_timelines = [(
1496                *timeline1.name(),
1497                TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1498            )];
1499            let expected_components = [(
1500                MyPoints::descriptor_points(),
1501                arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1502            )];
1503            let expected_chunk = Chunk::from_native_row_ids(
1504                chunks[0].id,
1505                entity_path1.clone(),
1506                None,
1507                &expected_row_ids,
1508                expected_timelines.into_iter().collect(),
1509                expected_components.into_iter().collect(),
1510            )?;
1511
1512            eprintln!("Expected:\n{expected_chunk}");
1513            eprintln!("Got:\n{}", chunks[0]);
1514            assert_eq!(expected_chunk, chunks[0]);
1515        }
1516
1517        {
1518            let expected_row_ids = vec![row2.row_id];
1519            let expected_timelines = [(
1520                *timeline1.name(),
1521                TimeColumn::new(Some(true), timeline1, vec![43].into()),
1522            )];
1523            let expected_components = [(
1524                MyPoints::descriptor_points(),
1525                arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1526            )];
1527            let expected_chunk = Chunk::from_native_row_ids(
1528                chunks[1].id,
1529                entity_path2.clone(),
1530                None,
1531                &expected_row_ids,
1532                expected_timelines.into_iter().collect(),
1533                expected_components.into_iter().collect(),
1534            )?;
1535
1536            eprintln!("Expected:\n{expected_chunk}");
1537            eprintln!("Got:\n{}", chunks[1]);
1538            assert_eq!(expected_chunk, chunks[1]);
1539        }
1540
1541        Ok(())
1542    }
1543
1544    /// A bunch of rows with different sets of timelines will end up in different batches.
1545    #[test]
1546    fn different_timelines() -> anyhow::Result<()> {
1547        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1548
1549        let timeline1 = Timeline::new_duration("log_time");
1550        let timeline2 = Timeline::new_sequence("frame_nr");
1551
1552        let timepoint1 = TimePoint::default().with(timeline1, 42);
1553        let timepoint2 = TimePoint::default()
1554            .with(timeline1, 43)
1555            .with(timeline2, 1000);
1556        let timepoint3 = TimePoint::default()
1557            .with(timeline1, 44)
1558            .with(timeline2, 1001);
1559
1560        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1561        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1562        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1563
1564        let components1 = [SerializedComponentBatch::new(
1565            points1.clone(),
1566            MyPoints::descriptor_points(),
1567        )];
1568        let components2 = [SerializedComponentBatch::new(
1569            points2.clone(),
1570            MyPoints::descriptor_points(),
1571        )];
1572        let components3 = [SerializedComponentBatch::new(
1573            points3.clone(),
1574            MyPoints::descriptor_points(),
1575        )];
1576
1577        let row1 = PendingRow::from_iter(timepoint1.clone(), components1);
1578        let row2 = PendingRow::from_iter(timepoint2.clone(), components2);
1579        let row3 = PendingRow::from_iter(timepoint3.clone(), components3);
1580
1581        let entity_path1: EntityPath = "a/b/c".into();
1582        batcher.push_row(entity_path1.clone(), row1.clone());
1583        batcher.push_row(entity_path1.clone(), row2.clone());
1584        batcher.push_row(entity_path1.clone(), row3.clone());
1585
1586        let chunks_rx = batcher.chunks();
1587        drop(batcher); // flush and close
1588
1589        let mut chunks = Vec::new();
1590        loop {
1591            let chunk = match chunks_rx.try_recv() {
1592                Ok(chunk) => chunk,
1593                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1594                Err(TryRecvError::Disconnected) => break,
1595            };
1596            chunks.push(chunk);
1597        }
1598
1599        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1600
1601        // Make the programmer's life easier if this test fails.
1602        eprintln!("Chunks:");
1603        for chunk in &chunks {
1604            eprintln!("{chunk}");
1605        }
1606
1607        assert_eq!(2, chunks.len());
1608
1609        {
1610            let expected_row_ids = vec![row1.row_id];
1611            let expected_timelines = [(
1612                *timeline1.name(),
1613                TimeColumn::new(Some(true), timeline1, vec![42].into()),
1614            )];
1615            let expected_components = [(
1616                MyPoints::descriptor_points(),
1617                arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(),
1618            )];
1619            let expected_chunk = Chunk::from_native_row_ids(
1620                chunks[0].id,
1621                entity_path1.clone(),
1622                None,
1623                &expected_row_ids,
1624                expected_timelines.into_iter().collect(),
1625                expected_components.into_iter().collect(),
1626            )?;
1627
1628            eprintln!("Expected:\n{expected_chunk}");
1629            eprintln!("Got:\n{}", chunks[0]);
1630            assert_eq!(expected_chunk, chunks[0]);
1631        }
1632
1633        {
1634            let expected_row_ids = vec![row2.row_id, row3.row_id];
1635            let expected_timelines = [
1636                (
1637                    *timeline1.name(),
1638                    TimeColumn::new(Some(true), timeline1, vec![43, 44].into()),
1639                ),
1640                (
1641                    *timeline2.name(),
1642                    TimeColumn::new(Some(true), timeline2, vec![1000, 1001].into()),
1643                ),
1644            ];
1645            let expected_components = [(
1646                MyPoints::descriptor_points(),
1647                arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(),
1648            )];
1649            let expected_chunk = Chunk::from_native_row_ids(
1650                chunks[1].id,
1651                entity_path1.clone(),
1652                None,
1653                &expected_row_ids,
1654                expected_timelines.into_iter().collect(),
1655                expected_components.into_iter().collect(),
1656            )?;
1657
1658            eprintln!("Expected:\n{expected_chunk}");
1659            eprintln!("Got:\n{}", chunks[1]);
1660            assert_eq!(expected_chunk, chunks[1]);
1661        }
1662
1663        Ok(())
1664    }
1665
1666    /// A bunch of rows with different datatypes will end up in different batches.
1667    #[test]
1668    fn different_datatypes() -> anyhow::Result<()> {
1669        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER, BatcherHooks::NONE)?;
1670
1671        let timeline1 = Timeline::new_duration("log_time");
1672
1673        let timepoint1 = TimePoint::default().with(timeline1, 42);
1674        let timepoint2 = TimePoint::default().with(timeline1, 43);
1675        let timepoint3 = TimePoint::default().with(timeline1, 44);
1676
1677        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1678        let points2 =
1679            MyPoint64::to_arrow([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?;
1680        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1681
1682        let components1 = [SerializedComponentBatch::new(
1683            points1.clone(),
1684            MyPoints::descriptor_points(),
1685        )];
1686        let components2 = [SerializedComponentBatch::new(
1687            points2.clone(),
1688            MyPoints::descriptor_points(),
1689        )]; // same name, different datatype
1690        let components3 = [SerializedComponentBatch::new(
1691            points3.clone(),
1692            MyPoints::descriptor_points(),
1693        )];
1694
1695        let row1 = PendingRow::from_iter(timepoint1.clone(), components1);
1696        let row2 = PendingRow::from_iter(timepoint2.clone(), components2);
1697        let row3 = PendingRow::from_iter(timepoint3.clone(), components3);
1698
1699        let entity_path1: EntityPath = "a/b/c".into();
1700        batcher.push_row(entity_path1.clone(), row1.clone());
1701        batcher.push_row(entity_path1.clone(), row2.clone());
1702        batcher.push_row(entity_path1.clone(), row3.clone());
1703
1704        let chunks_rx = batcher.chunks();
1705        drop(batcher); // flush and close
1706
1707        let mut chunks = Vec::new();
1708        loop {
1709            let chunk = match chunks_rx.try_recv() {
1710                Ok(chunk) => chunk,
1711                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1712                Err(TryRecvError::Disconnected) => break,
1713            };
1714            chunks.push(chunk);
1715        }
1716
1717        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1718
1719        // Make the programmer's life easier if this test fails.
1720        eprintln!("Chunks:");
1721        for chunk in &chunks {
1722            eprintln!("{chunk}");
1723        }
1724
1725        assert_eq!(2, chunks.len());
1726
1727        {
1728            let expected_row_ids = vec![row1.row_id, row3.row_id];
1729            let expected_timelines = [(
1730                *timeline1.name(),
1731                TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1732            )];
1733            let expected_components = [(
1734                MyPoints::descriptor_points(),
1735                arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1736            )];
1737            let expected_chunk = Chunk::from_native_row_ids(
1738                chunks[0].id,
1739                entity_path1.clone(),
1740                None,
1741                &expected_row_ids,
1742                expected_timelines.into_iter().collect(),
1743                expected_components.into_iter().collect(),
1744            )?;
1745
1746            eprintln!("Expected:\n{expected_chunk}");
1747            eprintln!("Got:\n{}", chunks[0]);
1748            assert_eq!(expected_chunk, chunks[0]);
1749        }
1750
1751        {
1752            let expected_row_ids = vec![row2.row_id];
1753            let expected_timelines = [(
1754                *timeline1.name(),
1755                TimeColumn::new(Some(true), timeline1, vec![43].into()),
1756            )];
1757            let expected_components = [(
1758                MyPoints::descriptor_points(),
1759                arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1760            )];
1761            let expected_chunk = Chunk::from_native_row_ids(
1762                chunks[1].id,
1763                entity_path1.clone(),
1764                None,
1765                &expected_row_ids,
1766                expected_timelines.into_iter().collect(),
1767                expected_components.into_iter().collect(),
1768            )?;
1769
1770            eprintln!("Expected:\n{expected_chunk}");
1771            eprintln!("Got:\n{}", chunks[1]);
1772            assert_eq!(expected_chunk, chunks[1]);
1773        }
1774
1775        Ok(())
1776    }
1777
1778    /// If one or more of the timelines end up unsorted, but the batch is below the unsorted length
1779    /// threshold, we don't do anything special.
1780    #[test]
1781    fn unsorted_timeline_below_threshold() -> anyhow::Result<()> {
1782        let batcher = ChunkBatcher::new(
1783            ChunkBatcherConfig {
1784                chunk_max_rows_if_unsorted: 1000,
1785                ..ChunkBatcherConfig::NEVER
1786            },
1787            BatcherHooks::NONE,
1788        )?;
1789
1790        let timeline1 = Timeline::new_duration("log_time");
1791        let timeline2 = Timeline::new_duration("frame_nr");
1792
1793        let timepoint1 = TimePoint::default()
1794            .with(timeline2, 1000)
1795            .with(timeline1, 42);
1796        let timepoint2 = TimePoint::default()
1797            .with(timeline2, 1001)
1798            .with(timeline1, 43);
1799        let timepoint3 = TimePoint::default()
1800            .with(timeline2, 1002)
1801            .with(timeline1, 44);
1802        let timepoint4 = TimePoint::default()
1803            .with(timeline2, 1003)
1804            .with(timeline1, 45);
1805
1806        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1807        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1808        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1809        let points4 =
1810            MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1811
1812        let components1 = [SerializedComponentBatch::new(
1813            points1.clone(),
1814            MyPoints::descriptor_points(),
1815        )];
1816        let components2 = [SerializedComponentBatch::new(
1817            points2.clone(),
1818            MyPoints::descriptor_points(),
1819        )];
1820        let components3 = [SerializedComponentBatch::new(
1821            points3.clone(),
1822            MyPoints::descriptor_points(),
1823        )];
1824        let components4 = [SerializedComponentBatch::new(
1825            points4.clone(),
1826            MyPoints::descriptor_points(),
1827        )];
1828
1829        let row1 = PendingRow::from_iter(timepoint4.clone(), components1);
1830        let row2 = PendingRow::from_iter(timepoint1.clone(), components2);
1831        let row3 = PendingRow::from_iter(timepoint2.clone(), components3);
1832        let row4 = PendingRow::from_iter(timepoint3.clone(), components4);
1833
1834        let entity_path1: EntityPath = "a/b/c".into();
1835        batcher.push_row(entity_path1.clone(), row1.clone());
1836        batcher.push_row(entity_path1.clone(), row2.clone());
1837        batcher.push_row(entity_path1.clone(), row3.clone());
1838        batcher.push_row(entity_path1.clone(), row4.clone());
1839
1840        let chunks_rx = batcher.chunks();
1841        drop(batcher); // flush and close
1842
1843        let mut chunks = Vec::new();
1844        loop {
1845            let chunk = match chunks_rx.try_recv() {
1846                Ok(chunk) => chunk,
1847                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1848                Err(TryRecvError::Disconnected) => break,
1849            };
1850            chunks.push(chunk);
1851        }
1852
1853        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1854
1855        // Make the programmer's life easier if this test fails.
1856        eprintln!("Chunks:");
1857        for chunk in &chunks {
1858            eprintln!("{chunk}");
1859        }
1860
1861        assert_eq!(1, chunks.len());
1862
1863        {
1864            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id, row4.row_id];
1865            let expected_timelines = [
1866                (
1867                    *timeline1.name(),
1868                    TimeColumn::new(Some(false), timeline1, vec![45, 42, 43, 44].into()),
1869                ),
1870                (
1871                    *timeline2.name(),
1872                    TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001, 1002].into()),
1873                ),
1874            ];
1875            let expected_components = [(
1876                MyPoints::descriptor_points(),
1877                arrays_to_list_array_opt(&[&*points1, &*points2, &*points3, &*points4].map(Some))
1878                    .unwrap(),
1879            )];
1880            let expected_chunk = Chunk::from_native_row_ids(
1881                chunks[0].id,
1882                entity_path1.clone(),
1883                None,
1884                &expected_row_ids,
1885                expected_timelines.into_iter().collect(),
1886                expected_components.into_iter().collect(),
1887            )?;
1888
1889            eprintln!("Expected:\n{expected_chunk}");
1890            eprintln!("Got:\n{}", chunks[0]);
1891            assert_eq!(expected_chunk, chunks[0]);
1892        }
1893
1894        Ok(())
1895    }
1896
1897    /// If one or more of the timelines end up unsorted, and the batch is above the unsorted length
1898    /// threshold, we split it.
1899    #[test]
1900    fn unsorted_timeline_above_threshold() -> anyhow::Result<()> {
1901        let batcher = ChunkBatcher::new(
1902            ChunkBatcherConfig {
1903                chunk_max_rows_if_unsorted: 3,
1904                ..ChunkBatcherConfig::NEVER
1905            },
1906            BatcherHooks::NONE,
1907        )?;
1908
1909        let timeline1 = Timeline::new_duration("log_time");
1910        let timeline2 = Timeline::new_duration("frame_nr");
1911
1912        let timepoint1 = TimePoint::default()
1913            .with(timeline2, 1000)
1914            .with(timeline1, 42);
1915        let timepoint2 = TimePoint::default()
1916            .with(timeline2, 1001)
1917            .with(timeline1, 43);
1918        let timepoint3 = TimePoint::default()
1919            .with(timeline2, 1002)
1920            .with(timeline1, 44);
1921        let timepoint4 = TimePoint::default()
1922            .with(timeline2, 1003)
1923            .with(timeline1, 45);
1924
1925        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1926        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1927        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1928        let points4 =
1929            MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1930
1931        let components1 = [SerializedComponentBatch::new(
1932            points1.clone(),
1933            MyPoints::descriptor_points(),
1934        )];
1935        let components2 = [SerializedComponentBatch::new(
1936            points2.clone(),
1937            MyPoints::descriptor_points(),
1938        )];
1939        let components3 = [SerializedComponentBatch::new(
1940            points3.clone(),
1941            MyPoints::descriptor_points(),
1942        )];
1943        let components4 = [SerializedComponentBatch::new(
1944            points4.clone(),
1945            MyPoints::descriptor_points(),
1946        )];
1947
1948        let row1 = PendingRow::from_iter(timepoint4.clone(), components1);
1949        let row2 = PendingRow::from_iter(timepoint1.clone(), components2);
1950        let row3 = PendingRow::from_iter(timepoint2.clone(), components3);
1951        let row4 = PendingRow::from_iter(timepoint3.clone(), components4);
1952
1953        let entity_path1: EntityPath = "a/b/c".into();
1954        batcher.push_row(entity_path1.clone(), row1.clone());
1955        batcher.push_row(entity_path1.clone(), row2.clone());
1956        batcher.push_row(entity_path1.clone(), row3.clone());
1957        batcher.push_row(entity_path1.clone(), row4.clone());
1958
1959        let chunks_rx = batcher.chunks();
1960        drop(batcher); // flush and close
1961
1962        let mut chunks = Vec::new();
1963        loop {
1964            let chunk = match chunks_rx.try_recv() {
1965                Ok(chunk) => chunk,
1966                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1967                Err(TryRecvError::Disconnected) => break,
1968            };
1969            chunks.push(chunk);
1970        }
1971
1972        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1973
1974        // Make the programmer's life easier if this test fails.
1975        eprintln!("Chunks:");
1976        for chunk in &chunks {
1977            eprintln!("{chunk}");
1978        }
1979
1980        assert_eq!(2, chunks.len());
1981
1982        {
1983            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1984            let expected_timelines = [
1985                (
1986                    *timeline1.name(),
1987                    TimeColumn::new(Some(false), timeline1, vec![45, 42, 43].into()),
1988                ),
1989                (
1990                    *timeline2.name(),
1991                    TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001].into()),
1992                ),
1993            ];
1994            let expected_components = [(
1995                MyPoints::descriptor_points(),
1996                arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1997            )];
1998            let expected_chunk = Chunk::from_native_row_ids(
1999                chunks[0].id,
2000                entity_path1.clone(),
2001                None,
2002                &expected_row_ids,
2003                expected_timelines.into_iter().collect(),
2004                expected_components.into_iter().collect(),
2005            )?;
2006
2007            eprintln!("Expected:\n{expected_chunk}");
2008            eprintln!("Got:\n{}", chunks[0]);
2009            assert_eq!(expected_chunk, chunks[0]);
2010        }
2011
2012        {
2013            let expected_row_ids = vec![row4.row_id];
2014            let expected_timelines = [
2015                (
2016                    *timeline1.name(),
2017                    TimeColumn::new(Some(true), timeline1, vec![44].into()),
2018                ),
2019                (
2020                    *timeline2.name(),
2021                    TimeColumn::new(Some(true), timeline2, vec![1002].into()),
2022                ),
2023            ];
2024            let expected_components = [(
2025                MyPoints::descriptor_points(),
2026                arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(),
2027            )];
2028            let expected_chunk = Chunk::from_native_row_ids(
2029                chunks[1].id,
2030                entity_path1.clone(),
2031                None,
2032                &expected_row_ids,
2033                expected_timelines.into_iter().collect(),
2034                expected_components.into_iter().collect(),
2035            )?;
2036
2037            eprintln!("Expected:\n{expected_chunk}");
2038            eprintln!("Got:\n{}", chunks[1]);
2039            assert_eq!(expected_chunk, chunks[1]);
2040        }
2041
2042        Ok(())
2043    }
2044}