re_chunk/
batcher.rs

1use std::{
2    hash::{Hash as _, Hasher as _},
3    sync::Arc,
4    time::{Duration, Instant},
5};
6
7use arrow::array::{Array as ArrowArray, ArrayRef};
8use arrow::buffer::ScalarBuffer as ArrowScalarBuffer;
9use crossbeam::channel::{Receiver, Sender};
10use nohash_hasher::IntMap;
11
12use re_arrow_util::arrays_to_list_array_opt;
13use re_byte_size::SizeBytes as _;
14use re_log_types::{EntityPath, ResolvedTimeRange, TimeInt, TimePoint, Timeline, TimelineName};
15use re_types_core::ComponentDescriptor;
16
17use crate::{chunk::ChunkComponents, Chunk, ChunkId, ChunkResult, RowId, TimeColumn};
18
19// ---
20
21/// Errors that can occur when creating/manipulating a [`ChunkBatcher`].
22#[derive(thiserror::Error, Debug)]
23pub enum ChunkBatcherError {
24    /// Error when parsing configuration from environment.
25    #[error("Failed to parse config: '{name}={value}': {err}")]
26    ParseConfig {
27        name: &'static str,
28        value: String,
29        err: Box<dyn std::error::Error + Send + Sync>,
30    },
31
32    /// Error spawning one of the background threads.
33    #[error("Failed to spawn background thread '{name}': {err}")]
34    SpawnThread {
35        name: &'static str,
36        err: Box<dyn std::error::Error + Send + Sync>,
37    },
38}
39
40pub type ChunkBatcherResult<T> = Result<T, ChunkBatcherError>;
41
42/// Callbacks you can install on the [`ChunkBatcher`].
43#[derive(Clone, Default)]
44pub struct BatcherHooks {
45    /// Called when a new row arrives.
46    ///
47    /// The callback is given the slice of all rows not yet batched,
48    /// including the new one.
49    ///
50    /// Used for testing.
51    #[allow(clippy::type_complexity)]
52    pub on_insert: Option<Arc<dyn Fn(&[PendingRow]) + Send + Sync>>,
53
54    /// Callback to be run when an Arrow Chunk goes out of scope.
55    ///
56    /// See [`re_log_types::ArrowRecordBatchReleaseCallback`] for more information.
57    //
58    // TODO(#6412): probably don't need this anymore.
59    pub on_release: Option<re_log_types::ArrowRecordBatchReleaseCallback>,
60}
61
62impl BatcherHooks {
63    pub const NONE: Self = Self {
64        on_insert: None,
65        on_release: None,
66    };
67}
68
69impl PartialEq for BatcherHooks {
70    fn eq(&self, other: &Self) -> bool {
71        let Self {
72            on_insert,
73            on_release,
74        } = self;
75
76        let on_insert_eq = match (on_insert, &other.on_insert) {
77            (Some(a), Some(b)) => Arc::ptr_eq(a, b),
78            (None, None) => true,
79            _ => false,
80        };
81
82        on_insert_eq && on_release == &other.on_release
83    }
84}
85
86impl std::fmt::Debug for BatcherHooks {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        let Self {
89            on_insert,
90            on_release,
91        } = self;
92        f.debug_struct("BatcherHooks")
93            .field("on_insert", &on_insert.as_ref().map(|_| "…"))
94            .field("on_release", &on_release)
95            .finish()
96    }
97}
98
99// ---
100
101/// Defines the different thresholds of the associated [`ChunkBatcher`].
102///
103/// See [`Self::default`] and [`Self::from_env`].
104#[derive(Clone, Debug, PartialEq)]
105pub struct ChunkBatcherConfig {
106    /// Duration of the periodic tick.
107    //
108    // NOTE: We use `std::time` directly because this library has to deal with `crossbeam` as well
109    // as std threads, which both expect standard types anyway.
110    //
111    // TODO(cmc): Add support for burst debouncing.
112    pub flush_tick: Duration,
113
114    /// Flush if the accumulated payload has a size in bytes equal or greater than this.
115    ///
116    /// The resulting [`Chunk`] might be larger than `flush_num_bytes`!
117    pub flush_num_bytes: u64,
118
119    /// Flush if the accumulated payload has a number of rows equal or greater than this.
120    pub flush_num_rows: u64,
121
122    /// Split a chunk if it contains >= rows than this threshold and one or more of its timelines are
123    /// unsorted.
124    pub chunk_max_rows_if_unsorted: u64,
125
126    /// Size of the internal channel of commands.
127    ///
128    /// Unbounded if left unspecified.
129    pub max_commands_in_flight: Option<u64>,
130
131    /// Size of the internal channel of [`Chunk`]s.
132    ///
133    /// Unbounded if left unspecified.
134    pub max_chunks_in_flight: Option<u64>,
135
136    /// Callbacks you can install on the [`ChunkBatcher`].
137    pub hooks: BatcherHooks,
138}
139
140impl Default for ChunkBatcherConfig {
141    fn default() -> Self {
142        Self::DEFAULT
143    }
144}
145
146impl ChunkBatcherConfig {
147    /// Default configuration, applicable to most use cases.
148    pub const DEFAULT: Self = Self {
149        flush_tick: Duration::from_millis(8), // We want it fast enough for 60 Hz for real time camera feel
150        flush_num_bytes: 1024 * 1024,         // 1 MiB
151        flush_num_rows: u64::MAX,
152        chunk_max_rows_if_unsorted: 256,
153        max_commands_in_flight: None,
154        max_chunks_in_flight: None,
155        hooks: BatcherHooks::NONE,
156    };
157
158    /// Always flushes ASAP.
159    pub const ALWAYS: Self = Self {
160        flush_tick: Duration::MAX,
161        flush_num_bytes: 0,
162        flush_num_rows: 0,
163        chunk_max_rows_if_unsorted: 256,
164        max_commands_in_flight: None,
165        max_chunks_in_flight: None,
166        hooks: BatcherHooks::NONE,
167    };
168
169    /// Never flushes unless manually told to (or hitting one the builtin invariants).
170    pub const NEVER: Self = Self {
171        flush_tick: Duration::MAX,
172        flush_num_bytes: u64::MAX,
173        flush_num_rows: u64::MAX,
174        chunk_max_rows_if_unsorted: 256,
175        max_commands_in_flight: None,
176        max_chunks_in_flight: None,
177        hooks: BatcherHooks::NONE,
178    };
179
180    /// Environment variable to configure [`Self::flush_tick`].
181    pub const ENV_FLUSH_TICK: &'static str = "RERUN_FLUSH_TICK_SECS";
182
183    /// Environment variable to configure [`Self::flush_num_bytes`].
184    pub const ENV_FLUSH_NUM_BYTES: &'static str = "RERUN_FLUSH_NUM_BYTES";
185
186    /// Environment variable to configure [`Self::flush_num_rows`].
187    pub const ENV_FLUSH_NUM_ROWS: &'static str = "RERUN_FLUSH_NUM_ROWS";
188
189    /// Environment variable to configure [`Self::chunk_max_rows_if_unsorted`].
190    //
191    // NOTE: Shared with the same env-var on the store side, for consistency.
192    pub const ENV_CHUNK_MAX_ROWS_IF_UNSORTED: &'static str = "RERUN_CHUNK_MAX_ROWS_IF_UNSORTED";
193
194    /// Environment variable to configure [`Self::chunk_max_rows_if_unsorted`].
195    #[deprecated(note = "use `RERUN_CHUNK_MAX_ROWS_IF_UNSORTED` instead")]
196    const ENV_MAX_CHUNK_ROWS_IF_UNSORTED: &'static str = "RERUN_MAX_CHUNK_ROWS_IF_UNSORTED";
197
198    /// Creates a new `ChunkBatcherConfig` using the default values, optionally overridden
199    /// through the environment.
200    ///
201    /// See [`Self::apply_env`].
202    #[inline]
203    pub fn from_env() -> ChunkBatcherResult<Self> {
204        Self::default().apply_env()
205    }
206
207    /// Returns a copy of `self`, overriding existing fields with values from the environment if
208    /// they are present.
209    ///
210    /// See [`Self::ENV_FLUSH_TICK`], [`Self::ENV_FLUSH_NUM_BYTES`], [`Self::ENV_FLUSH_NUM_BYTES`].
211    pub fn apply_env(&self) -> ChunkBatcherResult<Self> {
212        let mut new = self.clone();
213
214        if let Ok(s) = std::env::var(Self::ENV_FLUSH_TICK) {
215            let flush_duration_secs: f64 =
216                s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
217                    name: Self::ENV_FLUSH_TICK,
218                    value: s.clone(),
219                    err: Box::new(err),
220                })?;
221
222            new.flush_tick = Duration::from_secs_f64(flush_duration_secs);
223        }
224
225        if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_BYTES) {
226            if let Some(num_bytes) = re_format::parse_bytes(&s) {
227                // e.g. "10MB"
228                new.flush_num_bytes = num_bytes.unsigned_abs();
229            } else {
230                // Assume it's just an integer
231                new.flush_num_bytes = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
232                    name: Self::ENV_FLUSH_NUM_BYTES,
233                    value: s.clone(),
234                    err: Box::new(err),
235                })?;
236            }
237        }
238
239        if let Ok(s) = std::env::var(Self::ENV_FLUSH_NUM_ROWS) {
240            new.flush_num_rows = s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
241                name: Self::ENV_FLUSH_NUM_ROWS,
242                value: s.clone(),
243                err: Box::new(err),
244            })?;
245        }
246
247        if let Ok(s) = std::env::var(Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED) {
248            new.chunk_max_rows_if_unsorted =
249                s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
250                    name: Self::ENV_CHUNK_MAX_ROWS_IF_UNSORTED,
251                    value: s.clone(),
252                    err: Box::new(err),
253                })?;
254        }
255
256        // Deprecated
257        #[expect(deprecated)]
258        if let Ok(s) = std::env::var(Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED) {
259            new.chunk_max_rows_if_unsorted =
260                s.parse().map_err(|err| ChunkBatcherError::ParseConfig {
261                    name: Self::ENV_MAX_CHUNK_ROWS_IF_UNSORTED,
262                    value: s.clone(),
263                    err: Box::new(err),
264                })?;
265        }
266
267        Ok(new)
268    }
269}
270
271#[test]
272fn chunk_batcher_config() {
273    // Detect breaking changes in our environment variables.
274    std::env::set_var("RERUN_FLUSH_TICK_SECS", "0.3");
275    std::env::set_var("RERUN_FLUSH_NUM_BYTES", "42");
276    std::env::set_var("RERUN_FLUSH_NUM_ROWS", "666");
277    std::env::set_var("RERUN_CHUNK_MAX_ROWS_IF_UNSORTED", "7777");
278
279    let config = ChunkBatcherConfig::from_env().unwrap();
280    let expected = ChunkBatcherConfig {
281        flush_tick: Duration::from_millis(300),
282        flush_num_bytes: 42,
283        flush_num_rows: 666,
284        chunk_max_rows_if_unsorted: 7777,
285        ..Default::default()
286    };
287    assert_eq!(expected, config);
288
289    std::env::set_var("RERUN_MAX_CHUNK_ROWS_IF_UNSORTED", "9999");
290
291    let config = ChunkBatcherConfig::from_env().unwrap();
292    let expected = ChunkBatcherConfig {
293        flush_tick: Duration::from_millis(300),
294        flush_num_bytes: 42,
295        flush_num_rows: 666,
296        chunk_max_rows_if_unsorted: 9999,
297        ..Default::default()
298    };
299    assert_eq!(expected, config);
300}
301
302// ---
303
304/// Implements an asynchronous batcher that coalesces [`PendingRow`]s into [`Chunk`]s based upon
305/// the thresholds defined in the associated [`ChunkBatcherConfig`].
306///
307/// ## Batching vs. splitting
308///
309/// The batching process is triggered solely by time and space thresholds -- whichever is hit first.
310/// This process will result in one big dataframe.
311///
312/// The splitting process will then run on top of that big dataframe, and split it further down
313/// into smaller [`Chunk`]s.
314/// Specifically, the dataframe will be splits into enough [`Chunk`]s so as to guarantee that:
315/// * no chunk contains data for more than one entity path
316/// * no chunk contains rows with different sets of timelines
317/// * no chunk uses more than one datatype for a given component
318/// * no chunk contains more rows than a pre-configured threshold if one or more timelines are unsorted
319///
320/// ## Multithreading and ordering
321///
322/// [`ChunkBatcher`] can be cheaply clone and used freely across any number of threads.
323///
324/// Internally, all operations are linearized into a pipeline:
325/// - All operations sent by a given thread will take effect in the same exact order as that
326///   thread originally sent them in, from its point of view.
327/// - There isn't any well defined global order across multiple threads.
328///
329/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all
330/// previous data sent by the calling thread has been batched and sent down the channel returned
331/// by [`ChunkBatcher::chunks`]; no more, no less.
332///
333/// ## Shutdown
334///
335/// The batcher can only be shutdown by dropping all instances of it, at which point it will
336/// automatically take care of flushing any pending data that might remain in the pipeline.
337///
338/// Shutting down cannot ever block.
339#[derive(Clone)]
340pub struct ChunkBatcher {
341    inner: Arc<ChunkBatcherInner>,
342}
343
344// NOTE: The receiving end of the command stream as well as the sending end of the chunk stream are
345// owned solely by the batching thread.
346struct ChunkBatcherInner {
347    /// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed,
348    /// therefore the `Drop` implementation is guaranteed that no more data can come in while it's
349    /// running.
350    tx_cmds: Sender<Command>,
351    // NOTE: Option so we can make shutdown non-blocking even with bounded channels.
352    rx_chunks: Option<Receiver<Chunk>>,
353    cmds_to_chunks_handle: Option<std::thread::JoinHandle<()>>,
354}
355
356impl Drop for ChunkBatcherInner {
357    fn drop(&mut self) {
358        // Drop the receiving end of the chunk stream first and foremost, so that we don't block
359        // even if the output channel is bounded and currently full.
360        if let Some(rx_chunks) = self.rx_chunks.take() {
361            if !rx_chunks.is_empty() {
362                re_log::warn!("Dropping data");
363            }
364        }
365
366        // NOTE: The command channel is private, if we're here, nothing is currently capable of
367        // sending data down the pipeline.
368        self.tx_cmds.send(Command::Shutdown).ok();
369        if let Some(handle) = self.cmds_to_chunks_handle.take() {
370            handle.join().ok();
371        }
372    }
373}
374
375enum Command {
376    AppendChunk(Chunk),
377    AppendRow(EntityPath, PendingRow),
378    Flush(Sender<()>),
379    Shutdown,
380}
381
382impl Command {
383    fn flush() -> (Self, Receiver<()>) {
384        let (tx, rx) = crossbeam::channel::bounded(0); // oneshot
385        (Self::Flush(tx), rx)
386    }
387}
388
389impl ChunkBatcher {
390    /// Creates a new [`ChunkBatcher`] using the passed in `config`.
391    ///
392    /// The returned object must be kept in scope: dropping it will trigger a clean shutdown of the
393    /// batcher.
394    #[must_use = "Batching threads will automatically shutdown when this object is dropped"]
395    #[allow(clippy::needless_pass_by_value)]
396    pub fn new(config: ChunkBatcherConfig) -> ChunkBatcherResult<Self> {
397        let (tx_cmds, rx_cmd) = match config.max_commands_in_flight {
398            Some(cap) => crossbeam::channel::bounded(cap as _),
399            None => crossbeam::channel::unbounded(),
400        };
401
402        let (tx_chunk, rx_chunks) = match config.max_chunks_in_flight {
403            Some(cap) => crossbeam::channel::bounded(cap as _),
404            None => crossbeam::channel::unbounded(),
405        };
406
407        let cmds_to_chunks_handle = {
408            const NAME: &str = "ChunkBatcher::cmds_to_chunks";
409            std::thread::Builder::new()
410                .name(NAME.into())
411                .spawn({
412                    let config = config.clone();
413                    move || batching_thread(config, rx_cmd, tx_chunk)
414                })
415                .map_err(|err| ChunkBatcherError::SpawnThread {
416                    name: NAME,
417                    err: Box::new(err),
418                })?
419        };
420
421        re_log::debug!(?config, "creating new chunk batcher");
422
423        let inner = ChunkBatcherInner {
424            tx_cmds,
425            rx_chunks: Some(rx_chunks),
426            cmds_to_chunks_handle: Some(cmds_to_chunks_handle),
427        };
428
429        Ok(Self {
430            inner: Arc::new(inner),
431        })
432    }
433
434    // --- Send commands ---
435
436    pub fn push_chunk(&self, chunk: Chunk) {
437        self.inner.push_chunk(chunk);
438    }
439
440    /// Pushes a [`PendingRow`] down the batching pipeline.
441    ///
442    /// This will computea the size of the row from the batching thread!
443    ///
444    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
445    #[inline]
446    pub fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
447        self.inner.push_row(entity_path, row);
448    }
449
450    /// Initiates a flush of the pipeline and returns immediately.
451    ///
452    /// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
453    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
454    #[inline]
455    pub fn flush_async(&self) {
456        self.inner.flush_async();
457    }
458
459    /// Initiates a flush the batching pipeline and waits for it to propagate.
460    ///
461    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
462    #[inline]
463    pub fn flush_blocking(&self) {
464        self.inner.flush_blocking();
465    }
466
467    // --- Subscribe to chunks ---
468
469    /// Returns a _shared_ channel in which are sent the batched [`Chunk`]s.
470    ///
471    /// Shutting down the batcher will close this channel.
472    ///
473    /// See [`ChunkBatcher`] docs for ordering semantics and multithreading guarantees.
474    pub fn chunks(&self) -> Receiver<Chunk> {
475        // NOTE: `rx_chunks` is only ever taken when the batcher as a whole is dropped, at which
476        // point it is impossible to call this method.
477        #[allow(clippy::unwrap_used)]
478        self.inner.rx_chunks.clone().unwrap()
479    }
480}
481
482impl ChunkBatcherInner {
483    fn push_chunk(&self, chunk: Chunk) {
484        self.send_cmd(Command::AppendChunk(chunk));
485    }
486
487    fn push_row(&self, entity_path: EntityPath, row: PendingRow) {
488        self.send_cmd(Command::AppendRow(entity_path, row));
489    }
490
491    fn flush_async(&self) {
492        let (flush_cmd, _) = Command::flush();
493        self.send_cmd(flush_cmd);
494    }
495
496    fn flush_blocking(&self) {
497        let (flush_cmd, oneshot) = Command::flush();
498        self.send_cmd(flush_cmd);
499        oneshot.recv().ok();
500    }
501
502    fn send_cmd(&self, cmd: Command) {
503        // NOTE: Internal channels can never be closed outside of the `Drop` impl, this cannot
504        // fail.
505        self.tx_cmds.send(cmd).ok();
506    }
507}
508
509#[allow(clippy::needless_pass_by_value)]
510fn batching_thread(config: ChunkBatcherConfig, rx_cmd: Receiver<Command>, tx_chunk: Sender<Chunk>) {
511    let rx_tick = crossbeam::channel::tick(config.flush_tick);
512
513    struct Accumulator {
514        latest: Instant,
515        entity_path: EntityPath,
516        pending_rows: Vec<PendingRow>,
517        pending_num_bytes: u64,
518    }
519
520    impl Accumulator {
521        fn new(entity_path: EntityPath) -> Self {
522            Self {
523                entity_path,
524                latest: Instant::now(),
525                pending_rows: Default::default(),
526                pending_num_bytes: Default::default(),
527            }
528        }
529
530        fn reset(&mut self) {
531            self.latest = Instant::now();
532            self.pending_rows.clear();
533            self.pending_num_bytes = 0;
534        }
535    }
536
537    let mut accs: IntMap<EntityPath, Accumulator> = IntMap::default();
538
539    fn do_push_row(acc: &mut Accumulator, row: PendingRow) {
540        acc.pending_num_bytes += row.total_size_bytes();
541        acc.pending_rows.push(row);
542    }
543
544    fn do_flush_all(
545        acc: &mut Accumulator,
546        tx_chunk: &Sender<Chunk>,
547        reason: &str,
548        chunk_max_rows_if_unsorted: u64,
549    ) {
550        let rows = std::mem::take(&mut acc.pending_rows);
551        if rows.is_empty() {
552            return;
553        }
554
555        re_log::trace!(
556            "Flushing {} rows and {} bytes. Reason: {reason}",
557            rows.len(),
558            re_format::format_bytes(acc.pending_num_bytes as _)
559        );
560
561        let chunks =
562            PendingRow::many_into_chunks(acc.entity_path.clone(), chunk_max_rows_if_unsorted, rows);
563        for chunk in chunks {
564            let mut chunk = match chunk {
565                Ok(chunk) => chunk,
566                Err(err) => {
567                    re_log::error!(%err, "corrupt chunk detected, dropping");
568                    continue;
569                }
570            };
571
572            // NOTE: This can only fail if all receivers have been dropped, which simply cannot happen
573            // as long the batching thread is alive… which is where we currently are.
574
575            let split_indicators = chunk.split_indicators();
576            if !chunk.components.is_empty() {
577                // make sure the chunk didn't contain *only* indicators!
578                tx_chunk.send(chunk).ok();
579            }
580            if let Some(split_indicators) = split_indicators {
581                tx_chunk.send(split_indicators).ok();
582            }
583        }
584
585        acc.reset();
586    }
587
588    re_log::trace!(
589        "Flushing every: {:.2}s, {} rows, {}",
590        config.flush_tick.as_secs_f64(),
591        config.flush_num_rows,
592        re_format::format_bytes(config.flush_num_bytes as _),
593    );
594
595    // Set to `true` when a flush is triggered for a reason other than hitting the time threshold,
596    // so that the next tick will not unnecessarily fire early.
597    let mut skip_next_tick = false;
598
599    use crossbeam::select;
600    loop {
601        select! {
602            recv(rx_cmd) -> cmd => {
603                let Ok(cmd) = cmd else {
604                    // All command senders are gone, which can only happen if the
605                    // `ChunkBatcher` itself has been dropped.
606                    break;
607                };
608
609
610                match cmd {
611                    Command::AppendChunk(mut chunk) => {
612                        // NOTE: This can only fail if all receivers have been dropped, which simply cannot happen
613                        // as long the batching thread is alive… which is where we currently are.
614
615                        let split_indicators = chunk.split_indicators();
616                        if !chunk.components.is_empty() {
617                            // make sure the chunk didn't contain *only* indicators!
618                            tx_chunk.send(chunk).ok();
619                        }
620                        if let Some(split_indicators) = split_indicators {
621                            tx_chunk.send(split_indicators).ok();
622                        }
623                    },
624                    Command::AppendRow(entity_path, row) => {
625                        let acc = accs.entry(entity_path.clone())
626                            .or_insert_with(|| Accumulator::new(entity_path));
627                        do_push_row(acc, row);
628
629                        if let Some(config) = config.hooks.on_insert.as_ref() {
630                            config(&acc.pending_rows);
631                        }
632
633                        if acc.pending_rows.len() as u64 >= config.flush_num_rows {
634                            do_flush_all(acc, &tx_chunk, "rows", config.chunk_max_rows_if_unsorted);
635                            skip_next_tick = true;
636                        } else if acc.pending_num_bytes >= config.flush_num_bytes {
637                            do_flush_all(acc, &tx_chunk, "bytes", config.chunk_max_rows_if_unsorted);
638                            skip_next_tick = true;
639                        }
640                    },
641
642                    Command::Flush(oneshot) => {
643                        skip_next_tick = true;
644                        for acc in accs.values_mut() {
645                            do_flush_all(acc, &tx_chunk, "manual", config.chunk_max_rows_if_unsorted);
646                        }
647                        drop(oneshot); // signals the oneshot
648                    },
649
650                    Command::Shutdown => break,
651                };
652            },
653
654            recv(rx_tick) -> _ => {
655                if skip_next_tick {
656                    skip_next_tick = false;
657                } else {
658                    // TODO(cmc): It would probably be better to have a ticker per entity path. Maybe. At some point.
659                    for acc in accs.values_mut() {
660                        do_flush_all(acc, &tx_chunk, "tick", config.chunk_max_rows_if_unsorted);
661                    }
662                }
663            },
664        };
665    }
666
667    drop(rx_cmd);
668    for acc in accs.values_mut() {
669        do_flush_all(
670            acc,
671            &tx_chunk,
672            "shutdown",
673            config.chunk_max_rows_if_unsorted,
674        );
675    }
676    drop(tx_chunk);
677
678    // NOTE: The receiving end of the command stream as well as the sending end of the chunk
679    // stream are owned solely by this thread.
680    // Past this point, all command writes and all chunk reads will return `ErrDisconnected`.
681}
682
683// ---
684
685/// A single row's worth of data (i.e. a single log call).
686///
687/// Send those to the batcher to build up a [`Chunk`].
688#[derive(Debug, Clone)]
689pub struct PendingRow {
690    /// Auto-generated `TUID`, uniquely identifying this event and keeping track of the client's
691    /// wall-clock.
692    pub row_id: RowId,
693
694    /// User-specified [`TimePoint`] for this event.
695    pub timepoint: TimePoint,
696
697    /// The component data.
698    ///
699    /// Each array is a single component, i.e. _not_ a list array.
700    pub components: IntMap<ComponentDescriptor, ArrayRef>,
701}
702
703impl PendingRow {
704    #[inline]
705    pub fn new(timepoint: TimePoint, components: IntMap<ComponentDescriptor, ArrayRef>) -> Self {
706        Self {
707            row_id: RowId::new(),
708            timepoint,
709            components,
710        }
711    }
712}
713
714impl re_byte_size::SizeBytes for PendingRow {
715    #[inline]
716    fn heap_size_bytes(&self) -> u64 {
717        let Self {
718            row_id,
719            timepoint,
720            components,
721        } = self;
722
723        row_id.heap_size_bytes() + timepoint.heap_size_bytes() + components.heap_size_bytes()
724    }
725}
726
727impl PendingRow {
728    /// Turn a single row into a [`Chunk`] of its own.
729    ///
730    /// That's very wasteful, probably don't do that outside of testing, or unless you have very
731    /// good reasons too.
732    ///
733    /// See also [`Self::many_into_chunks`].
734    pub fn into_chunk(self, entity_path: EntityPath) -> ChunkResult<Chunk> {
735        let Self {
736            row_id,
737            timepoint,
738            components,
739        } = self;
740
741        let timelines = timepoint
742            .into_iter()
743            .map(|(timeline_name, cell)| {
744                let times = ArrowScalarBuffer::from(vec![cell.as_i64()]);
745                let time_column =
746                    TimeColumn::new(Some(true), Timeline::new(timeline_name, cell.typ()), times);
747                (timeline_name, time_column)
748            })
749            .collect();
750
751        let mut per_name = ChunkComponents::default();
752        for (component_desc, array) in components {
753            let list_array = arrays_to_list_array_opt(&[Some(&*array as _)]);
754            if let Some(list_array) = list_array {
755                per_name.insert_descriptor(component_desc, list_array);
756            }
757        }
758
759        Chunk::from_native_row_ids(
760            ChunkId::new(),
761            entity_path,
762            Some(true),
763            &[row_id],
764            timelines,
765            per_name,
766        )
767    }
768
769    /// This turns a batch of [`PendingRow`]s into a [`Chunk`].
770    ///
771    /// There are a lot of conditions to fulfill for a [`Chunk`] to be valid: this helper makes
772    /// sure to fulfill all of them by splitting the chunk into one or more pieces as necessary.
773    ///
774    /// In particular, a [`Chunk`] cannot:
775    /// * contain data for more than one entity path
776    /// * contain rows with different sets of timelines
777    /// * use more than one datatype for a given component
778    /// * contain more rows than a pre-configured threshold if one or more timelines are unsorted
779    //
780    // TODO(cmc): there are lots of performance improvement opportunities in this one, but let's
781    // see if that actually matters in practice first.
782    pub fn many_into_chunks(
783        entity_path: EntityPath,
784        chunk_max_rows_if_unsorted: u64,
785        mut rows: Vec<Self>,
786    ) -> impl Iterator<Item = ChunkResult<Chunk>> {
787        re_tracing::profile_function!();
788
789        // First things first, sort all the rows by row ID -- that's our global order and it holds
790        // true no matter what.
791        {
792            re_tracing::profile_scope!("sort rows");
793            rows.sort_by_key(|row| row.row_id);
794        }
795
796        // Then organize the rows in micro batches -- one batch per unique set of timelines.
797        let mut per_timeline_set: IntMap<u64 /* Timeline set */, Vec<Self>> = Default::default();
798        {
799            re_tracing::profile_scope!("compute timeline sets");
800
801            // The hash is deterministic because the traversal of a `TimePoint` is itself
802            // deterministic: `TimePoint` is backed by a `BTreeMap`.
803            for row in rows {
804                let mut hasher = ahash::AHasher::default();
805                row.timepoint.timeline_names().for_each(|timeline| {
806                    <TimelineName as std::hash::Hash>::hash(timeline, &mut hasher);
807                });
808
809                per_timeline_set
810                    .entry(hasher.finish())
811                    .or_default()
812                    .push(row);
813            }
814        }
815
816        per_timeline_set.into_values().flat_map(move |rows| {
817            re_tracing::profile_scope!("iterate per timeline set");
818
819            // Then we split the micro batches even further -- one sub-batch per unique set of datatypes.
820            let mut per_datatype_set: IntMap<u64 /* ArrowDatatype set */, Vec<Self>> =
821                Default::default();
822            {
823                re_tracing::profile_scope!("compute datatype sets");
824
825                // The hash is dependent on the order in which the `PendingRow` was created (i.e.
826                // the order in which its components were inserted).
827                //
828                // This is because the components are stored in a `IntMap`, which doesn't do any
829                // hashing. For that reason, the traversal order of a `IntMap` in deterministic:
830                // it's always the same for `IntMap` that share the same keys, as long as these
831                // keys were inserted in the same order.
832                // See `intmap_order_is_deterministic` in the tests below.
833                //
834                // In practice, the `PendingRow`s in a given program are always built in the same
835                // order for the duration of that program, which is why this works.
836                // See `simple_but_hashes_wont_match` in the tests below.
837                for row in rows {
838                    let mut hasher = ahash::AHasher::default();
839                    row.components
840                        .values()
841                        .for_each(|array| array.data_type().hash(&mut hasher));
842                    per_datatype_set
843                        .entry(hasher.finish())
844                        .or_default()
845                        .push(row);
846                }
847            }
848
849            // And finally we can build the resulting chunks.
850            let entity_path = entity_path.clone();
851            per_datatype_set.into_values().flat_map(move |rows| {
852                re_tracing::profile_scope!("iterate per datatype set");
853
854                let mut row_ids: Vec<RowId> = Vec::with_capacity(rows.len());
855                let mut timelines: IntMap<TimelineName, PendingTimeColumn> = IntMap::default();
856
857                // Create all the logical list arrays that we're going to need, accounting for the
858                // possibility of sparse components in the data.
859                let mut all_components: IntMap<ComponentDescriptor, Vec<Option<&dyn ArrowArray>>> =
860                    IntMap::default();
861                for row in &rows {
862                    for component_desc in row.components.keys() {
863                        all_components.entry(component_desc.clone()).or_default();
864                    }
865                }
866
867                let mut chunks = Vec::new();
868
869                let mut components = all_components.clone();
870                for row in &rows {
871                    let Self {
872                        row_id,
873                        timepoint: row_timepoint,
874                        components: row_components,
875                    } = row;
876
877                    // Look for unsorted timelines -- if we find any, and the chunk is larger than
878                    // the pre-configured `chunk_max_rows_if_unsorted` threshold, then split _even_
879                    // further!
880                    for (&timeline_name, cell) in row_timepoint {
881                        let time_column = timelines.entry(timeline_name).or_insert_with(|| {
882                            PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
883                        });
884
885                        if !row_ids.is_empty() // just being extra cautious
886                            && row_ids.len() as u64 >= chunk_max_rows_if_unsorted
887                            && !time_column.is_sorted
888                        {
889                            chunks.push(Chunk::from_native_row_ids(
890                                ChunkId::new(),
891                                entity_path.clone(),
892                                Some(true),
893                                &std::mem::take(&mut row_ids),
894                                std::mem::take(&mut timelines)
895                                    .into_iter()
896                                    .map(|(name, time_column)| (name, time_column.finish()))
897                                    .collect(),
898                                {
899                                    let mut per_name = ChunkComponents::default();
900                                    for (component_desc, arrays) in std::mem::take(&mut components)
901                                    {
902                                        let list_array = arrays_to_list_array_opt(&arrays);
903                                        if let Some(list_array) = list_array {
904                                            per_name.insert_descriptor(component_desc, list_array);
905                                        }
906                                    }
907                                    per_name
908                                },
909                            ));
910
911                            components = all_components.clone();
912                        }
913                    }
914
915                    row_ids.push(*row_id);
916
917                    for (&timeline_name, &cell) in row_timepoint {
918                        let time_column = timelines.entry(timeline_name).or_insert_with(|| {
919                            PendingTimeColumn::new(Timeline::new(timeline_name, cell.typ()))
920                        });
921                        time_column.push(cell.into());
922                    }
923
924                    for (component_desc, arrays) in &mut components {
925                        // NOTE: This will push `None` if the row doesn't actually hold a value for this
926                        // component -- these are sparse list arrays!
927                        arrays.push(
928                            row_components
929                                .get(component_desc)
930                                .map(|array| &**array as &dyn ArrowArray),
931                        );
932                    }
933                }
934
935                chunks.push(Chunk::from_native_row_ids(
936                    ChunkId::new(),
937                    entity_path.clone(),
938                    Some(true),
939                    &std::mem::take(&mut row_ids),
940                    timelines
941                        .into_iter()
942                        .map(|(timeline, time_column)| (timeline, time_column.finish()))
943                        .collect(),
944                    {
945                        let mut per_name = ChunkComponents::default();
946                        for (component_desc, arrays) in components {
947                            let list_array = arrays_to_list_array_opt(&arrays);
948                            if let Some(list_array) = list_array {
949                                per_name.insert_descriptor(component_desc, list_array);
950                            }
951                        }
952                        per_name
953                    },
954                ));
955
956                chunks
957            })
958        })
959    }
960}
961
962/// Helper class used to buffer time data.
963///
964/// See [`PendingRow::many_into_chunks`] for usage.
965struct PendingTimeColumn {
966    timeline: Timeline,
967    times: Vec<i64>,
968    is_sorted: bool,
969    time_range: ResolvedTimeRange,
970}
971
972impl PendingTimeColumn {
973    fn new(timeline: Timeline) -> Self {
974        Self {
975            timeline,
976            times: Default::default(),
977            is_sorted: true,
978            time_range: ResolvedTimeRange::EMPTY,
979        }
980    }
981
982    /// Push a single time value at the end of this chunk.
983    fn push(&mut self, time: TimeInt) {
984        let Self {
985            timeline: _,
986            times,
987            is_sorted,
988            time_range,
989        } = self;
990
991        *is_sorted &= times.last().copied().unwrap_or(TimeInt::MIN.as_i64()) <= time.as_i64();
992        time_range.set_min(TimeInt::min(time_range.min(), time));
993        time_range.set_max(TimeInt::max(time_range.max(), time));
994        times.push(time.as_i64());
995    }
996
997    fn finish(self) -> TimeColumn {
998        let Self {
999            timeline,
1000            times,
1001            is_sorted,
1002            time_range,
1003        } = self;
1004
1005        TimeColumn {
1006            timeline,
1007            times: ArrowScalarBuffer::from(times),
1008            is_sorted,
1009            time_range,
1010        }
1011    }
1012}
1013
1014// ---
1015
1016// NOTE:
1017// These tests only cover the chunk splitting conditions described in `many_into_chunks`.
1018// Temporal and spatial thresholds are already taken care of by the RecordingStream test suite.
1019
1020#[cfg(test)]
1021mod tests {
1022    use crossbeam::channel::TryRecvError;
1023
1024    use re_log_types::example_components::{MyColor, MyIndex, MyLabel, MyPoint, MyPoint64};
1025    use re_types_core::{Component as _, Loggable as _};
1026
1027    use super::*;
1028
1029    /// A bunch of rows that don't fit any of the split conditions should end up together.
1030    #[test]
1031    fn simple() -> anyhow::Result<()> {
1032        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1033
1034        let timeline1 = Timeline::new_duration("log_time");
1035
1036        let timepoint1 = TimePoint::default().with(timeline1, 42);
1037        let timepoint2 = TimePoint::default().with(timeline1, 43);
1038        let timepoint3 = TimePoint::default().with(timeline1, 44);
1039
1040        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1041        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1042        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1043
1044        let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1045        let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1046        let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1047
1048        let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1049        let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1050        let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1051
1052        let components1 = [
1053            (MyPoint::descriptor(), points1.clone()),
1054            (MyLabel::descriptor(), labels1.clone()),
1055            (MyIndex::descriptor(), indices1.clone()),
1056        ];
1057        let components2 = [
1058            (MyPoint::descriptor(), points2.clone()),
1059            (MyLabel::descriptor(), labels2.clone()),
1060            (MyIndex::descriptor(), indices2.clone()),
1061        ];
1062        let components3 = [
1063            (MyPoint::descriptor(), points3.clone()),
1064            (MyLabel::descriptor(), labels3.clone()),
1065            (MyIndex::descriptor(), indices3.clone()),
1066        ];
1067
1068        let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1069        let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1070        let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1071
1072        let entity_path1: EntityPath = "a/b/c".into();
1073        batcher.push_row(entity_path1.clone(), row1.clone());
1074        batcher.push_row(entity_path1.clone(), row2.clone());
1075        batcher.push_row(entity_path1.clone(), row3.clone());
1076
1077        let chunks_rx = batcher.chunks();
1078        drop(batcher); // flush and close
1079
1080        let mut chunks = Vec::new();
1081        loop {
1082            let chunk = match chunks_rx.try_recv() {
1083                Ok(chunk) => chunk,
1084                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1085                Err(TryRecvError::Disconnected) => break,
1086            };
1087            chunks.push(chunk);
1088        }
1089
1090        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1091
1092        // Make the programmer's life easier if this test fails.
1093        eprintln!("Chunks:");
1094        for chunk in &chunks {
1095            eprintln!("{chunk}");
1096        }
1097
1098        assert_eq!(1, chunks.len());
1099
1100        {
1101            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1102            let expected_timelines = [(
1103                *timeline1.name(),
1104                TimeColumn::new(Some(true), timeline1, vec![42, 43, 44].into()),
1105            )];
1106            let expected_components = [
1107                (
1108                    MyPoint::descriptor(),
1109                    arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1110                ), //
1111                (
1112                    MyLabel::descriptor(),
1113                    arrays_to_list_array_opt(&[&*labels1, &*labels2, &*labels3].map(Some)).unwrap(),
1114                ), //
1115                (
1116                    MyIndex::descriptor(),
1117                    arrays_to_list_array_opt(&[&*indices1, &*indices2, &*indices3].map(Some))
1118                        .unwrap(),
1119                ), //
1120            ];
1121            let expected_chunk = Chunk::from_native_row_ids(
1122                chunks[0].id,
1123                entity_path1.clone(),
1124                None,
1125                &expected_row_ids,
1126                expected_timelines.into_iter().collect(),
1127                expected_components.into_iter().collect(),
1128            )?;
1129
1130            eprintln!("Expected:\n{expected_chunk}");
1131            eprintln!("Got:\n{}", chunks[0]);
1132            assert_eq!(expected_chunk, chunks[0]);
1133        }
1134
1135        Ok(())
1136    }
1137
1138    #[test]
1139    #[allow(clippy::len_zero)]
1140    fn simple_but_hashes_might_not_match() -> anyhow::Result<()> {
1141        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1142
1143        let timeline1 = Timeline::new_duration("log_time");
1144
1145        let timepoint1 = TimePoint::default().with(timeline1, 42);
1146        let timepoint2 = TimePoint::default().with(timeline1, 43);
1147        let timepoint3 = TimePoint::default().with(timeline1, 44);
1148
1149        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1150        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1151        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1152
1153        let labels1 = MyLabel::to_arrow([MyLabel("a".into()), MyLabel("b".into())])?;
1154        let labels2 = MyLabel::to_arrow([MyLabel("c".into()), MyLabel("d".into())])?;
1155        let labels3 = MyLabel::to_arrow([MyLabel("e".into()), MyLabel("d".into())])?;
1156
1157        let indices1 = MyIndex::to_arrow([MyIndex(0), MyIndex(1)])?;
1158        let indices2 = MyIndex::to_arrow([MyIndex(2), MyIndex(3)])?;
1159        let indices3 = MyIndex::to_arrow([MyIndex(4), MyIndex(5)])?;
1160
1161        let components1 = [
1162            (MyIndex::descriptor(), indices1.clone()),
1163            (MyPoint::descriptor(), points1.clone()),
1164            (MyLabel::descriptor(), labels1.clone()),
1165        ];
1166        let components2 = [
1167            (MyPoint::descriptor(), points2.clone()),
1168            (MyLabel::descriptor(), labels2.clone()),
1169            (MyIndex::descriptor(), indices2.clone()),
1170        ];
1171        let components3 = [
1172            (MyLabel::descriptor(), labels3.clone()),
1173            (MyIndex::descriptor(), indices3.clone()),
1174            (MyPoint::descriptor(), points3.clone()),
1175        ];
1176
1177        let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1178        let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1179        let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1180
1181        let entity_path1: EntityPath = "a/b/c".into();
1182        batcher.push_row(entity_path1.clone(), row1.clone());
1183        batcher.push_row(entity_path1.clone(), row2.clone());
1184        batcher.push_row(entity_path1.clone(), row3.clone());
1185
1186        let chunks_rx = batcher.chunks();
1187        drop(batcher); // flush and close
1188
1189        let mut chunks = Vec::new();
1190        loop {
1191            let chunk = match chunks_rx.try_recv() {
1192                Ok(chunk) => chunk,
1193                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1194                Err(TryRecvError::Disconnected) => break,
1195            };
1196            chunks.push(chunk);
1197        }
1198
1199        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1200
1201        // Make the programmer's life easier if this test fails.
1202        eprintln!("Chunks:");
1203        for chunk in &chunks {
1204            eprintln!("{chunk}");
1205        }
1206
1207        // The rows's components were inserted in different orders, and therefore the resulting
1208        // `IntMap`s *may* have different traversal orders, which ultimately means that the datatype
1209        // hashes *may* end up being different: i.e., possibly no batching.
1210        //
1211        // In practice, it's still possible to get lucky and end up with two maps that just happen
1212        // to share the same iteration order regardless, which is why this assertion is overly broad.
1213        // Try running this test with `--show-output`.
1214        assert!(chunks.len() >= 1);
1215
1216        Ok(())
1217    }
1218
1219    #[test]
1220    #[allow(clippy::zero_sized_map_values)]
1221    fn intmap_order_is_deterministic() {
1222        let descriptors = [
1223            MyPoint::descriptor(),
1224            MyColor::descriptor(),
1225            MyLabel::descriptor(),
1226            MyPoint64::descriptor(),
1227            MyIndex::descriptor(),
1228        ];
1229
1230        let expected: IntMap<ComponentDescriptor, ()> =
1231            descriptors.iter().cloned().map(|d| (d, ())).collect();
1232        let expected: Vec<_> = expected.into_keys().collect();
1233
1234        for _ in 0..1_000 {
1235            let got_collect: IntMap<ComponentDescriptor, ()> =
1236                descriptors.clone().into_iter().map(|d| (d, ())).collect();
1237            let got_collect: Vec<_> = got_collect.into_keys().collect();
1238
1239            let mut got_insert: IntMap<ComponentDescriptor, ()> = Default::default();
1240            for d in descriptors.clone() {
1241                got_insert.insert(d, ());
1242            }
1243            let got_insert: Vec<_> = got_insert.into_keys().collect();
1244
1245            assert_eq!(expected, got_collect);
1246            assert_eq!(expected, got_insert);
1247        }
1248    }
1249
1250    /// A bunch of rows that don't fit any of the split conditions should end up together.
1251    #[test]
1252    fn simple_static() -> anyhow::Result<()> {
1253        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1254
1255        let static_ = TimePoint::default();
1256
1257        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1258        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1259        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1260
1261        let components1 = [(MyPoint::descriptor(), points1.clone())];
1262        let components2 = [(MyPoint::descriptor(), points2.clone())];
1263        let components3 = [(MyPoint::descriptor(), points3.clone())];
1264
1265        let row1 = PendingRow::new(static_.clone(), components1.into_iter().collect());
1266        let row2 = PendingRow::new(static_.clone(), components2.into_iter().collect());
1267        let row3 = PendingRow::new(static_.clone(), components3.into_iter().collect());
1268
1269        let entity_path1: EntityPath = "a/b/c".into();
1270        batcher.push_row(entity_path1.clone(), row1.clone());
1271        batcher.push_row(entity_path1.clone(), row2.clone());
1272        batcher.push_row(entity_path1.clone(), row3.clone());
1273
1274        let chunks_rx = batcher.chunks();
1275        drop(batcher); // flush and close
1276
1277        let mut chunks = Vec::new();
1278        loop {
1279            let chunk = match chunks_rx.try_recv() {
1280                Ok(chunk) => chunk,
1281                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1282                Err(TryRecvError::Disconnected) => break,
1283            };
1284            chunks.push(chunk);
1285        }
1286
1287        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1288
1289        // Make the programmer's life easier if this test fails.
1290        eprintln!("Chunks:");
1291        for chunk in &chunks {
1292            eprintln!("{chunk}");
1293        }
1294
1295        assert_eq!(1, chunks.len());
1296
1297        {
1298            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1299            let expected_timelines = [];
1300            let expected_components = [(
1301                MyPoint::descriptor(),
1302                arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1303            )];
1304            let expected_chunk = Chunk::from_native_row_ids(
1305                chunks[0].id,
1306                entity_path1.clone(),
1307                None,
1308                &expected_row_ids,
1309                expected_timelines.into_iter().collect(),
1310                expected_components.into_iter().collect(),
1311            )?;
1312
1313            eprintln!("Expected:\n{expected_chunk}");
1314            eprintln!("Got:\n{}", chunks[0]);
1315            assert_eq!(expected_chunk, chunks[0]);
1316        }
1317
1318        Ok(())
1319    }
1320
1321    /// A bunch of rows belonging to different entities will end up in different batches.
1322    #[test]
1323    fn different_entities() -> anyhow::Result<()> {
1324        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1325
1326        let timeline1 = Timeline::new_duration("log_time");
1327
1328        let timepoint1 = TimePoint::default().with(timeline1, 42);
1329        let timepoint2 = TimePoint::default().with(timeline1, 43);
1330        let timepoint3 = TimePoint::default().with(timeline1, 44);
1331
1332        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1333        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1334        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1335
1336        let components1 = [(MyPoint::descriptor(), points1.clone())];
1337        let components2 = [(MyPoint::descriptor(), points2.clone())];
1338        let components3 = [(MyPoint::descriptor(), points3.clone())];
1339
1340        let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1341        let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1342        let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1343
1344        let entity_path1: EntityPath = "ent1".into();
1345        let entity_path2: EntityPath = "ent2".into();
1346        batcher.push_row(entity_path1.clone(), row1.clone());
1347        batcher.push_row(entity_path2.clone(), row2.clone());
1348        batcher.push_row(entity_path1.clone(), row3.clone());
1349
1350        let chunks_rx = batcher.chunks();
1351        drop(batcher); // flush and close
1352
1353        let mut chunks = Vec::new();
1354        loop {
1355            let chunk = match chunks_rx.try_recv() {
1356                Ok(chunk) => chunk,
1357                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1358                Err(TryRecvError::Disconnected) => break,
1359            };
1360            chunks.push(chunk);
1361        }
1362
1363        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1364
1365        // Make the programmer's life easier if this test fails.
1366        eprintln!("Chunks:");
1367        for chunk in &chunks {
1368            eprintln!("{chunk}");
1369        }
1370
1371        assert_eq!(2, chunks.len());
1372
1373        {
1374            let expected_row_ids = vec![row1.row_id, row3.row_id];
1375            let expected_timelines = [(
1376                *timeline1.name(),
1377                TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1378            )];
1379            let expected_components = [(
1380                MyPoint::descriptor(),
1381                arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1382            )];
1383            let expected_chunk = Chunk::from_native_row_ids(
1384                chunks[0].id,
1385                entity_path1.clone(),
1386                None,
1387                &expected_row_ids,
1388                expected_timelines.into_iter().collect(),
1389                expected_components.into_iter().collect(),
1390            )?;
1391
1392            eprintln!("Expected:\n{expected_chunk}");
1393            eprintln!("Got:\n{}", chunks[0]);
1394            assert_eq!(expected_chunk, chunks[0]);
1395        }
1396
1397        {
1398            let expected_row_ids = vec![row2.row_id];
1399            let expected_timelines = [(
1400                *timeline1.name(),
1401                TimeColumn::new(Some(true), timeline1, vec![43].into()),
1402            )];
1403            let expected_components = [(
1404                MyPoint::descriptor(),
1405                arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1406            )];
1407            let expected_chunk = Chunk::from_native_row_ids(
1408                chunks[1].id,
1409                entity_path2.clone(),
1410                None,
1411                &expected_row_ids,
1412                expected_timelines.into_iter().collect(),
1413                expected_components.into_iter().collect(),
1414            )?;
1415
1416            eprintln!("Expected:\n{expected_chunk}");
1417            eprintln!("Got:\n{}", chunks[1]);
1418            assert_eq!(expected_chunk, chunks[1]);
1419        }
1420
1421        Ok(())
1422    }
1423
1424    /// A bunch of rows with different sets of timelines will end up in different batches.
1425    #[test]
1426    fn different_timelines() -> anyhow::Result<()> {
1427        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1428
1429        let timeline1 = Timeline::new_duration("log_time");
1430        let timeline2 = Timeline::new_sequence("frame_nr");
1431
1432        let timepoint1 = TimePoint::default().with(timeline1, 42);
1433        let timepoint2 = TimePoint::default()
1434            .with(timeline1, 43)
1435            .with(timeline2, 1000);
1436        let timepoint3 = TimePoint::default()
1437            .with(timeline1, 44)
1438            .with(timeline2, 1001);
1439
1440        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1441        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1442        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1443
1444        let components1 = [(MyPoint::descriptor(), points1.clone())];
1445        let components2 = [(MyPoint::descriptor(), points2.clone())];
1446        let components3 = [(MyPoint::descriptor(), points3.clone())];
1447
1448        let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1449        let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1450        let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1451
1452        let entity_path1: EntityPath = "a/b/c".into();
1453        batcher.push_row(entity_path1.clone(), row1.clone());
1454        batcher.push_row(entity_path1.clone(), row2.clone());
1455        batcher.push_row(entity_path1.clone(), row3.clone());
1456
1457        let chunks_rx = batcher.chunks();
1458        drop(batcher); // flush and close
1459
1460        let mut chunks = Vec::new();
1461        loop {
1462            let chunk = match chunks_rx.try_recv() {
1463                Ok(chunk) => chunk,
1464                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1465                Err(TryRecvError::Disconnected) => break,
1466            };
1467            chunks.push(chunk);
1468        }
1469
1470        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1471
1472        // Make the programmer's life easier if this test fails.
1473        eprintln!("Chunks:");
1474        for chunk in &chunks {
1475            eprintln!("{chunk}");
1476        }
1477
1478        assert_eq!(2, chunks.len());
1479
1480        {
1481            let expected_row_ids = vec![row1.row_id];
1482            let expected_timelines = [(
1483                *timeline1.name(),
1484                TimeColumn::new(Some(true), timeline1, vec![42].into()),
1485            )];
1486            let expected_components = [(
1487                MyPoint::descriptor(),
1488                arrays_to_list_array_opt(&[&*points1].map(Some)).unwrap(),
1489            )];
1490            let expected_chunk = Chunk::from_native_row_ids(
1491                chunks[0].id,
1492                entity_path1.clone(),
1493                None,
1494                &expected_row_ids,
1495                expected_timelines.into_iter().collect(),
1496                expected_components.into_iter().collect(),
1497            )?;
1498
1499            eprintln!("Expected:\n{expected_chunk}");
1500            eprintln!("Got:\n{}", chunks[0]);
1501            assert_eq!(expected_chunk, chunks[0]);
1502        }
1503
1504        {
1505            let expected_row_ids = vec![row2.row_id, row3.row_id];
1506            let expected_timelines = [
1507                (
1508                    *timeline1.name(),
1509                    TimeColumn::new(Some(true), timeline1, vec![43, 44].into()),
1510                ),
1511                (
1512                    *timeline2.name(),
1513                    TimeColumn::new(Some(true), timeline2, vec![1000, 1001].into()),
1514                ),
1515            ];
1516            let expected_components = [(
1517                MyPoint::descriptor(),
1518                arrays_to_list_array_opt(&[&*points2, &*points3].map(Some)).unwrap(),
1519            )];
1520            let expected_chunk = Chunk::from_native_row_ids(
1521                chunks[1].id,
1522                entity_path1.clone(),
1523                None,
1524                &expected_row_ids,
1525                expected_timelines.into_iter().collect(),
1526                expected_components.into_iter().collect(),
1527            )?;
1528
1529            eprintln!("Expected:\n{expected_chunk}");
1530            eprintln!("Got:\n{}", chunks[1]);
1531            assert_eq!(expected_chunk, chunks[1]);
1532        }
1533
1534        Ok(())
1535    }
1536
1537    /// A bunch of rows with different datatypes will end up in different batches.
1538    #[test]
1539    fn different_datatypes() -> anyhow::Result<()> {
1540        let batcher = ChunkBatcher::new(ChunkBatcherConfig::NEVER)?;
1541
1542        let timeline1 = Timeline::new_duration("log_time");
1543
1544        let timepoint1 = TimePoint::default().with(timeline1, 42);
1545        let timepoint2 = TimePoint::default().with(timeline1, 43);
1546        let timepoint3 = TimePoint::default().with(timeline1, 44);
1547
1548        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1549        let points2 =
1550            MyPoint64::to_arrow([MyPoint64::new(10.0, 20.0), MyPoint64::new(30.0, 40.0)])?;
1551        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1552
1553        let components1 = [(MyPoint::descriptor(), points1.clone())];
1554        let components2 = [(MyPoint::descriptor(), points2.clone())]; // same name, different datatype
1555        let components3 = [(MyPoint::descriptor(), points3.clone())];
1556
1557        let row1 = PendingRow::new(timepoint1.clone(), components1.into_iter().collect());
1558        let row2 = PendingRow::new(timepoint2.clone(), components2.into_iter().collect());
1559        let row3 = PendingRow::new(timepoint3.clone(), components3.into_iter().collect());
1560
1561        let entity_path1: EntityPath = "a/b/c".into();
1562        batcher.push_row(entity_path1.clone(), row1.clone());
1563        batcher.push_row(entity_path1.clone(), row2.clone());
1564        batcher.push_row(entity_path1.clone(), row3.clone());
1565
1566        let chunks_rx = batcher.chunks();
1567        drop(batcher); // flush and close
1568
1569        let mut chunks = Vec::new();
1570        loop {
1571            let chunk = match chunks_rx.try_recv() {
1572                Ok(chunk) => chunk,
1573                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1574                Err(TryRecvError::Disconnected) => break,
1575            };
1576            chunks.push(chunk);
1577        }
1578
1579        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1580
1581        // Make the programmer's life easier if this test fails.
1582        eprintln!("Chunks:");
1583        for chunk in &chunks {
1584            eprintln!("{chunk}");
1585        }
1586
1587        assert_eq!(2, chunks.len());
1588
1589        {
1590            let expected_row_ids = vec![row1.row_id, row3.row_id];
1591            let expected_timelines = [(
1592                *timeline1.name(),
1593                TimeColumn::new(Some(true), timeline1, vec![42, 44].into()),
1594            )];
1595            let expected_components = [(
1596                MyPoint::descriptor(),
1597                arrays_to_list_array_opt(&[&*points1, &*points3].map(Some)).unwrap(),
1598            )];
1599            let expected_chunk = Chunk::from_native_row_ids(
1600                chunks[0].id,
1601                entity_path1.clone(),
1602                None,
1603                &expected_row_ids,
1604                expected_timelines.into_iter().collect(),
1605                expected_components.into_iter().collect(),
1606            )?;
1607
1608            eprintln!("Expected:\n{expected_chunk}");
1609            eprintln!("Got:\n{}", chunks[0]);
1610            assert_eq!(expected_chunk, chunks[0]);
1611        }
1612
1613        {
1614            let expected_row_ids = vec![row2.row_id];
1615            let expected_timelines = [(
1616                *timeline1.name(),
1617                TimeColumn::new(Some(true), timeline1, vec![43].into()),
1618            )];
1619            let expected_components = [(
1620                MyPoint::descriptor(),
1621                arrays_to_list_array_opt(&[&*points2].map(Some)).unwrap(),
1622            )];
1623            let expected_chunk = Chunk::from_native_row_ids(
1624                chunks[1].id,
1625                entity_path1.clone(),
1626                None,
1627                &expected_row_ids,
1628                expected_timelines.into_iter().collect(),
1629                expected_components.into_iter().collect(),
1630            )?;
1631
1632            eprintln!("Expected:\n{expected_chunk}");
1633            eprintln!("Got:\n{}", chunks[1]);
1634            assert_eq!(expected_chunk, chunks[1]);
1635        }
1636
1637        Ok(())
1638    }
1639
1640    /// If one or more of the timelines end up unsorted, but the batch is below the unsorted length
1641    /// threshold, we don't do anything special.
1642    #[test]
1643    fn unsorted_timeline_below_threshold() -> anyhow::Result<()> {
1644        let batcher = ChunkBatcher::new(ChunkBatcherConfig {
1645            chunk_max_rows_if_unsorted: 1000,
1646            ..ChunkBatcherConfig::NEVER
1647        })?;
1648
1649        let timeline1 = Timeline::new_duration("log_time");
1650        let timeline2 = Timeline::new_duration("frame_nr");
1651
1652        let timepoint1 = TimePoint::default()
1653            .with(timeline2, 1000)
1654            .with(timeline1, 42);
1655        let timepoint2 = TimePoint::default()
1656            .with(timeline2, 1001)
1657            .with(timeline1, 43);
1658        let timepoint3 = TimePoint::default()
1659            .with(timeline2, 1002)
1660            .with(timeline1, 44);
1661        let timepoint4 = TimePoint::default()
1662            .with(timeline2, 1003)
1663            .with(timeline1, 45);
1664
1665        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1666        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1667        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1668        let points4 =
1669            MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1670
1671        let components1 = [(MyPoint::descriptor(), points1.clone())];
1672        let components2 = [(MyPoint::descriptor(), points2.clone())];
1673        let components3 = [(MyPoint::descriptor(), points3.clone())];
1674        let components4 = [(MyPoint::descriptor(), points4.clone())];
1675
1676        let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect());
1677        let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect());
1678        let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect());
1679        let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect());
1680
1681        let entity_path1: EntityPath = "a/b/c".into();
1682        batcher.push_row(entity_path1.clone(), row1.clone());
1683        batcher.push_row(entity_path1.clone(), row2.clone());
1684        batcher.push_row(entity_path1.clone(), row3.clone());
1685        batcher.push_row(entity_path1.clone(), row4.clone());
1686
1687        let chunks_rx = batcher.chunks();
1688        drop(batcher); // flush and close
1689
1690        let mut chunks = Vec::new();
1691        loop {
1692            let chunk = match chunks_rx.try_recv() {
1693                Ok(chunk) => chunk,
1694                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1695                Err(TryRecvError::Disconnected) => break,
1696            };
1697            chunks.push(chunk);
1698        }
1699
1700        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1701
1702        // Make the programmer's life easier if this test fails.
1703        eprintln!("Chunks:");
1704        for chunk in &chunks {
1705            eprintln!("{chunk}");
1706        }
1707
1708        assert_eq!(1, chunks.len());
1709
1710        {
1711            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id, row4.row_id];
1712            let expected_timelines = [
1713                (
1714                    *timeline1.name(),
1715                    TimeColumn::new(Some(false), timeline1, vec![45, 42, 43, 44].into()),
1716                ),
1717                (
1718                    *timeline2.name(),
1719                    TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001, 1002].into()),
1720                ),
1721            ];
1722            let expected_components = [(
1723                MyPoint::descriptor(),
1724                arrays_to_list_array_opt(&[&*points1, &*points2, &*points3, &*points4].map(Some))
1725                    .unwrap(),
1726            )];
1727            let expected_chunk = Chunk::from_native_row_ids(
1728                chunks[0].id,
1729                entity_path1.clone(),
1730                None,
1731                &expected_row_ids,
1732                expected_timelines.into_iter().collect(),
1733                expected_components.into_iter().collect(),
1734            )?;
1735
1736            eprintln!("Expected:\n{expected_chunk}");
1737            eprintln!("Got:\n{}", chunks[0]);
1738            assert_eq!(expected_chunk, chunks[0]);
1739        }
1740
1741        Ok(())
1742    }
1743
1744    /// If one or more of the timelines end up unsorted, and the batch is above the unsorted length
1745    /// threshold, we split it.
1746    #[test]
1747    fn unsorted_timeline_above_threshold() -> anyhow::Result<()> {
1748        let batcher = ChunkBatcher::new(ChunkBatcherConfig {
1749            chunk_max_rows_if_unsorted: 3,
1750            ..ChunkBatcherConfig::NEVER
1751        })?;
1752
1753        let timeline1 = Timeline::new_duration("log_time");
1754        let timeline2 = Timeline::new_duration("frame_nr");
1755
1756        let timepoint1 = TimePoint::default()
1757            .with(timeline2, 1000)
1758            .with(timeline1, 42);
1759        let timepoint2 = TimePoint::default()
1760            .with(timeline2, 1001)
1761            .with(timeline1, 43);
1762        let timepoint3 = TimePoint::default()
1763            .with(timeline2, 1002)
1764            .with(timeline1, 44);
1765        let timepoint4 = TimePoint::default()
1766            .with(timeline2, 1003)
1767            .with(timeline1, 45);
1768
1769        let points1 = MyPoint::to_arrow([MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)])?;
1770        let points2 = MyPoint::to_arrow([MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)])?;
1771        let points3 = MyPoint::to_arrow([MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)])?;
1772        let points4 =
1773            MyPoint::to_arrow([MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)])?;
1774
1775        let components1 = [(MyPoint::descriptor(), points1.clone())];
1776        let components2 = [(MyPoint::descriptor(), points2.clone())];
1777        let components3 = [(MyPoint::descriptor(), points3.clone())];
1778        let components4 = [(MyPoint::descriptor(), points4.clone())];
1779
1780        let row1 = PendingRow::new(timepoint4.clone(), components1.into_iter().collect());
1781        let row2 = PendingRow::new(timepoint1.clone(), components2.into_iter().collect());
1782        let row3 = PendingRow::new(timepoint2.clone(), components3.into_iter().collect());
1783        let row4 = PendingRow::new(timepoint3.clone(), components4.into_iter().collect());
1784
1785        let entity_path1: EntityPath = "a/b/c".into();
1786        batcher.push_row(entity_path1.clone(), row1.clone());
1787        batcher.push_row(entity_path1.clone(), row2.clone());
1788        batcher.push_row(entity_path1.clone(), row3.clone());
1789        batcher.push_row(entity_path1.clone(), row4.clone());
1790
1791        let chunks_rx = batcher.chunks();
1792        drop(batcher); // flush and close
1793
1794        let mut chunks = Vec::new();
1795        loop {
1796            let chunk = match chunks_rx.try_recv() {
1797                Ok(chunk) => chunk,
1798                Err(TryRecvError::Empty) => panic!("expected chunk, got none"),
1799                Err(TryRecvError::Disconnected) => break,
1800            };
1801            chunks.push(chunk);
1802        }
1803
1804        chunks.sort_by_key(|chunk| chunk.row_id_range().unwrap().0);
1805
1806        // Make the programmer's life easier if this test fails.
1807        eprintln!("Chunks:");
1808        for chunk in &chunks {
1809            eprintln!("{chunk}");
1810        }
1811
1812        assert_eq!(2, chunks.len());
1813
1814        {
1815            let expected_row_ids = vec![row1.row_id, row2.row_id, row3.row_id];
1816            let expected_timelines = [
1817                (
1818                    *timeline1.name(),
1819                    TimeColumn::new(Some(false), timeline1, vec![45, 42, 43].into()),
1820                ),
1821                (
1822                    *timeline2.name(),
1823                    TimeColumn::new(Some(false), timeline2, vec![1003, 1000, 1001].into()),
1824                ),
1825            ];
1826            let expected_components = [(
1827                MyPoint::descriptor(),
1828                arrays_to_list_array_opt(&[&*points1, &*points2, &*points3].map(Some)).unwrap(),
1829            )];
1830            let expected_chunk = Chunk::from_native_row_ids(
1831                chunks[0].id,
1832                entity_path1.clone(),
1833                None,
1834                &expected_row_ids,
1835                expected_timelines.into_iter().collect(),
1836                expected_components.into_iter().collect(),
1837            )?;
1838
1839            eprintln!("Expected:\n{expected_chunk}");
1840            eprintln!("Got:\n{}", chunks[0]);
1841            assert_eq!(expected_chunk, chunks[0]);
1842        }
1843
1844        {
1845            let expected_row_ids = vec![row4.row_id];
1846            let expected_timelines = [
1847                (
1848                    *timeline1.name(),
1849                    TimeColumn::new(Some(true), timeline1, vec![44].into()),
1850                ),
1851                (
1852                    *timeline2.name(),
1853                    TimeColumn::new(Some(true), timeline2, vec![1002].into()),
1854                ),
1855            ];
1856            let expected_components = [(
1857                MyPoint::descriptor(),
1858                arrays_to_list_array_opt(&[&*points4].map(Some)).unwrap(),
1859            )];
1860            let expected_chunk = Chunk::from_native_row_ids(
1861                chunks[1].id,
1862                entity_path1.clone(),
1863                None,
1864                &expected_row_ids,
1865                expected_timelines.into_iter().collect(),
1866                expected_components.into_iter().collect(),
1867            )?;
1868
1869            eprintln!("Expected:\n{expected_chunk}");
1870            eprintln!("Got:\n{}", chunks[1]);
1871            assert_eq!(expected_chunk, chunks[1]);
1872        }
1873
1874        Ok(())
1875    }
1876}