arrow_select/coalesce.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BatchCoalescer`] concatenates multiple [`RecordBatch`]es after
19//! operations such as [`filter`] and [`take`].
20//!
21//! [`filter`]: crate::filter::filter
22//! [`take`]: crate::take::take
23use crate::filter::filter_record_batch;
24use arrow_array::types::{BinaryViewType, StringViewType};
25use arrow_array::{downcast_primitive, Array, ArrayRef, BooleanArray, RecordBatch};
26use arrow_schema::{ArrowError, DataType, SchemaRef};
27use std::collections::VecDeque;
28use std::sync::Arc;
29// Originally From DataFusion's coalesce module:
30// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
31
32mod byte_view;
33mod generic;
34mod primitive;
35
36use byte_view::InProgressByteViewArray;
37use generic::GenericInProgressArray;
38use primitive::InProgressPrimitiveArray;
39
40/// Concatenate multiple [`RecordBatch`]es
41///
42/// Implements the common pattern of incrementally creating output
43/// [`RecordBatch`]es of a specific size from an input stream of
44/// [`RecordBatch`]es.
45///
46/// This is useful after operations such as [`filter`] and [`take`] that produce
47/// smaller batches, and we want to coalesce them into larger batches for
48/// further processing.
49///
50/// # Motivation
51///
52/// If we use [`concat_batches`] to implement the same functionality, there are 2 potential issues:
53/// 1. At least 2x peak memory (holding the input and output of concat)
54/// 2. 2 copies of the data (to create the output of filter and then create the output of concat)
55///
56/// See: <https://github.com/apache/arrow-rs/issues/6692> for more discussions
57/// about the motivation.
58///
59/// [`filter`]: crate::filter::filter
60/// [`take`]: crate::take::take
61/// [`concat_batches`]: crate::concat::concat_batches
62///
63/// # Example
64/// ```
65/// use arrow_array::record_batch;
66/// use arrow_select::coalesce::{BatchCoalescer};
67/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
68/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
69///
70/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
71/// let target_batch_size = 4;
72/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
73///
74/// // push the batches
75/// coalescer.push_batch(batch1).unwrap();
76/// // only pushed 3 rows (not yet 4, enough to produce a batch)
77/// assert!(coalescer.next_completed_batch().is_none());
78/// coalescer.push_batch(batch2).unwrap();
79/// // now we have 5 rows, so we can produce a batch
80/// let finished = coalescer.next_completed_batch().unwrap();
81/// // 4 rows came out (target batch size is 4)
82/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
83/// assert_eq!(finished, expected);
84///
85/// // Have no more input, but still have an in-progress batch
86/// assert!(coalescer.next_completed_batch().is_none());
87/// // We can finish the batch, which will produce the remaining rows
88/// coalescer.finish_buffered_batch().unwrap();
89/// let expected = record_batch!(("a", Int32, [5])).unwrap();
90/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
91///
92/// // The coalescer is now empty
93/// assert!(coalescer.next_completed_batch().is_none());
94/// ```
95///
96/// # Background
97///
98/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
99/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
100/// there is fixed processing overhead per batch. This coalescer builds up these
101/// larger batches incrementally.
102///
103/// ```text
104/// ┌────────────────────┐
105/// │ RecordBatch │
106/// │ num_rows = 100 │
107/// └────────────────────┘ ┌────────────────────┐
108/// │ │
109/// ┌────────────────────┐ Coalesce │ │
110/// │ │ Batches │ │
111/// │ RecordBatch │ │ │
112/// │ num_rows = 200 │ ─ ─ ─ ─ ─ ─ ▶ │ │
113/// │ │ │ RecordBatch │
114/// │ │ │ num_rows = 400 │
115/// └────────────────────┘ │ │
116/// │ │
117/// ┌────────────────────┐ │ │
118/// │ │ │ │
119/// │ RecordBatch │ │ │
120/// │ num_rows = 100 │ └────────────────────┘
121/// │ │
122/// └────────────────────┘
123/// ```
124///
125/// # Notes:
126///
127/// 1. Output rows are produced in the same order as the input rows
128///
129/// 2. The output is a sequence of batches, with all but the last being at exactly
130/// `target_batch_size` rows.
131#[derive(Debug)]
132pub struct BatchCoalescer {
133 /// The input schema
134 schema: SchemaRef,
135 /// The target batch size (and thus size for views allocation). This is a
136 /// hard limit: the output batch will be exactly `target_batch_size`,
137 /// rather than possibly being slightly above.
138 target_batch_size: usize,
139 /// In-progress arrays
140 in_progress_arrays: Vec<Box<dyn InProgressArray>>,
141 /// Buffered row count. Always less than `batch_size`
142 buffered_rows: usize,
143 /// Completed batches
144 completed: VecDeque<RecordBatch>,
145 /// Biggest coalesce batch size. See [`Self::with_biggest_coalesce_batch_size`]
146 biggest_coalesce_batch_size: Option<usize>,
147}
148
149impl BatchCoalescer {
150 /// Create a new `BatchCoalescer`
151 ///
152 /// # Arguments
153 /// - `schema` - the schema of the output batches
154 /// - `target_batch_size` - the number of rows in each output batch.
155 /// Typical values are `4096` or `8192` rows.
156 ///
157 pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
158 let in_progress_arrays = schema
159 .fields()
160 .iter()
161 .map(|field| create_in_progress_array(field.data_type(), target_batch_size))
162 .collect::<Vec<_>>();
163
164 Self {
165 schema,
166 target_batch_size,
167 in_progress_arrays,
168 // We will for sure store at least one completed batch
169 completed: VecDeque::with_capacity(1),
170 buffered_rows: 0,
171 biggest_coalesce_batch_size: None,
172 }
173 }
174
175 /// Set the coalesce batch size limit (default `None`)
176 ///
177 /// This limit determine when batches should bypass coalescing. Intuitively,
178 /// batches that are already large are costly to coalesce and are efficient
179 /// enough to process directly without coalescing.
180 ///
181 /// If `Some(limit)`, batches larger than this limit will bypass coalescing
182 /// when there is no buffered data, or when the previously buffered data
183 /// already exceeds this limit.
184 ///
185 /// If `None`, all batches will be coalesced according to the
186 /// target_batch_size.
187 pub fn with_biggest_coalesce_batch_size(mut self, limit: Option<usize>) -> Self {
188 self.biggest_coalesce_batch_size = limit;
189 self
190 }
191
192 /// Get the current biggest coalesce batch size limit
193 ///
194 /// See [`Self::with_biggest_coalesce_batch_size`] for details
195 pub fn biggest_coalesce_batch_size(&self) -> Option<usize> {
196 self.biggest_coalesce_batch_size
197 }
198
199 /// Set the biggest coalesce batch size limit
200 ///
201 /// See [`Self::with_biggest_coalesce_batch_size`] for details
202 pub fn set_biggest_coalesce_batch_size(&mut self, limit: Option<usize>) {
203 self.biggest_coalesce_batch_size = limit;
204 }
205
206 /// Return the schema of the output batches
207 pub fn schema(&self) -> SchemaRef {
208 Arc::clone(&self.schema)
209 }
210
211 /// Push a batch into the Coalescer after applying a filter
212 ///
213 /// This is semantically equivalent of calling [`Self::push_batch`]
214 /// with the results from [`filter_record_batch`]
215 ///
216 /// # Example
217 /// ```
218 /// # use arrow_array::{record_batch, BooleanArray};
219 /// # use arrow_select::coalesce::BatchCoalescer;
220 /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
221 /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
222 /// // Apply a filter to each batch to pick the first and last row
223 /// let filter = BooleanArray::from(vec![true, false, true]);
224 /// // create a new Coalescer that targets creating 1000 row batches
225 /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
226 /// coalescer.push_batch_with_filter(batch1, &filter);
227 /// coalescer.push_batch_with_filter(batch2, &filter);
228 /// // finsh and retrieve the created batch
229 /// coalescer.finish_buffered_batch().unwrap();
230 /// let completed_batch = coalescer.next_completed_batch().unwrap();
231 /// // filtered out 2 and 5:
232 /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
233 /// assert_eq!(completed_batch, expected_batch);
234 /// ```
235 pub fn push_batch_with_filter(
236 &mut self,
237 batch: RecordBatch,
238 filter: &BooleanArray,
239 ) -> Result<(), ArrowError> {
240 // TODO: optimize this to avoid materializing (copying the results
241 // of filter to a new batch)
242 let filtered_batch = filter_record_batch(&batch, filter)?;
243 self.push_batch(filtered_batch)
244 }
245
246 /// Push all the rows from `batch` into the Coalescer
247 ///
248 /// When buffered data plus incoming rows reach `target_batch_size` ,
249 /// completed batches are generated eagerly and can be retrieved via
250 /// [`Self::next_completed_batch()`].
251 /// Output batches contain exactly `target_batch_size` rows, so the tail of
252 /// the input batch may remain buffered.
253 /// Remaining partial data either waits for future input batches or can be
254 /// materialized immediately by calling [`Self::finish_buffered_batch()`].
255 ///
256 /// # Example
257 /// ```
258 /// # use arrow_array::record_batch;
259 /// # use arrow_select::coalesce::BatchCoalescer;
260 /// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
261 /// let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
262 /// // create a new Coalescer that targets creating 1000 row batches
263 /// let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
264 /// coalescer.push_batch(batch1);
265 /// coalescer.push_batch(batch2);
266 /// // finsh and retrieve the created batch
267 /// coalescer.finish_buffered_batch().unwrap();
268 /// let completed_batch = coalescer.next_completed_batch().unwrap();
269 /// let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
270 /// assert_eq!(completed_batch, expected_batch);
271 /// ```
272 pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> {
273 // Large batch bypass optimization:
274 // When biggest_coalesce_batch_size is configured and a batch exceeds this limit,
275 // we can avoid expensive split-and-merge operations by passing it through directly.
276 //
277 // IMPORTANT: This optimization is OPTIONAL and only active when biggest_coalesce_batch_size
278 // is explicitly set via with_biggest_coalesce_batch_size(Some(limit)).
279 // If not set (None), ALL batches follow normal coalescing behavior regardless of size.
280
281 // =============================================================================
282 // CASE 1: No buffer + large batch → Direct bypass
283 // =============================================================================
284 // Example scenario (target_batch_size=1000, biggest_coalesce_batch_size=Some(500)):
285 // Input sequence: [600, 1200, 300]
286 //
287 // With biggest_coalesce_batch_size=Some(500) (optimization enabled):
288 // 600 → large batch detected! buffered_rows=0 → Case 1: direct bypass
289 // → output: [600] (bypass, preserves large batch)
290 // 1200 → large batch detected! buffered_rows=0 → Case 1: direct bypass
291 // → output: [1200] (bypass, preserves large batch)
292 // 300 → normal batch, buffer: [300]
293 // Result: [600], [1200], [300] - large batches preserved, mixed sizes
294
295 // =============================================================================
296 // CASE 2: Buffer too large + large batch → Flush first, then bypass
297 // =============================================================================
298 // This case prevents creating extremely large merged batches that would
299 // significantly exceed both target_batch_size and biggest_coalesce_batch_size.
300 //
301 // Example 1: Buffer exceeds limit before large batch arrives
302 // target_batch_size=1000, biggest_coalesce_batch_size=Some(400)
303 // Input: [350, 200, 800]
304 //
305 // Step 1: push_batch([350])
306 // → batch_size=350 <= 400, normal path
307 // → buffer: [350], buffered_rows=350
308 //
309 // Step 2: push_batch([200])
310 // → batch_size=200 <= 400, normal path
311 // → buffer: [350, 200], buffered_rows=550
312 //
313 // Step 3: push_batch([800])
314 // → batch_size=800 > 400, large batch path
315 // → buffered_rows=550 > 400 → Case 2: flush first
316 // → flush: output [550] (combined [350, 200])
317 // → then bypass: output [800]
318 // Result: [550], [800] - buffer flushed to prevent oversized merge
319 //
320 // Example 2: Multiple small batches accumulate before large batch
321 // target_batch_size=1000, biggest_coalesce_batch_size=Some(300)
322 // Input: [150, 100, 80, 900]
323 //
324 // Step 1-3: Accumulate small batches
325 // 150 → buffer: [150], buffered_rows=150
326 // 100 → buffer: [150, 100], buffered_rows=250
327 // 80 → buffer: [150, 100, 80], buffered_rows=330
328 //
329 // Step 4: push_batch([900])
330 // → batch_size=900 > 300, large batch path
331 // → buffered_rows=330 > 300 → Case 2: flush first
332 // → flush: output [330] (combined [150, 100, 80])
333 // → then bypass: output [900]
334 // Result: [330], [900] - prevents merge into [1230] which would be too large
335
336 // =============================================================================
337 // CASE 3: Small buffer + large batch → Normal coalescing (no bypass)
338 // =============================================================================
339 // When buffer is small enough, we still merge to maintain efficiency
340 // Example: target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
341 // Input: [300, 1200]
342 //
343 // Step 1: push_batch([300])
344 // → batch_size=300 <= 500, normal path
345 // → buffer: [300], buffered_rows=300
346 //
347 // Step 2: push_batch([1200])
348 // → batch_size=1200 > 500, large batch path
349 // → buffered_rows=300 <= 500 → Case 3: normal merge
350 // → buffer: [300, 1200] (1500 total)
351 // → 1500 > target_batch_size → split: output [1000], buffer [500]
352 // Result: [1000], [500] - normal split/merge behavior maintained
353
354 // =============================================================================
355 // Comparison: Default vs Optimized Behavior
356 // =============================================================================
357 // target_batch_size=1000, biggest_coalesce_batch_size=Some(500)
358 // Input: [600, 1200, 300]
359 //
360 // DEFAULT BEHAVIOR (biggest_coalesce_batch_size=None):
361 // 600 → buffer: [600]
362 // 1200 → buffer: [600, 1200] (1800 rows total)
363 // → split: output [1000 rows], buffer [800 rows remaining]
364 // 300 → buffer: [800, 300] (1100 rows total)
365 // → split: output [1000 rows], buffer [100 rows remaining]
366 // Result: [1000], [1000], [100] - all outputs respect target_batch_size
367 //
368 // OPTIMIZED BEHAVIOR (biggest_coalesce_batch_size=Some(500)):
369 // 600 → Case 1: direct bypass → output: [600]
370 // 1200 → Case 1: direct bypass → output: [1200]
371 // 300 → normal path → buffer: [300]
372 // Result: [600], [1200], [300] - large batches preserved
373
374 // =============================================================================
375 // Benefits and Trade-offs
376 // =============================================================================
377 // Benefits of the optimization:
378 // - Large batches stay intact (better for downstream vectorized processing)
379 // - Fewer split/merge operations (better CPU performance)
380 // - More predictable memory usage patterns
381 // - Maintains streaming efficiency while preserving batch boundaries
382 //
383 // Trade-offs:
384 // - Output batch sizes become variable (not always target_batch_size)
385 // - May produce smaller partial batches when flushing before large batches
386 // - Requires tuning biggest_coalesce_batch_size parameter for optimal performance
387
388 // TODO, for unsorted batches, we may can filter all large batches, and coalesce all
389 // small batches together?
390
391 let batch_size = batch.num_rows();
392
393 // Fast path: skip empty batches
394 if batch_size == 0 {
395 return Ok(());
396 }
397
398 // Large batch optimization: bypass coalescing for oversized batches
399 if let Some(limit) = self.biggest_coalesce_batch_size {
400 if batch_size > limit {
401 // Case 1: No buffered data - emit large batch directly
402 // Example: [] + [1200] → output [1200], buffer []
403 if self.buffered_rows == 0 {
404 self.completed.push_back(batch);
405 return Ok(());
406 }
407
408 // Case 2: Buffer too large - flush then emit to avoid oversized merge
409 // Example: [850] + [1200] → output [850], then output [1200]
410 // This prevents creating batches much larger than both target_batch_size
411 // and biggest_coalesce_batch_size, which could cause memory issues
412 if self.buffered_rows > limit {
413 self.finish_buffered_batch()?;
414 self.completed.push_back(batch);
415 return Ok(());
416 }
417
418 // Case 3: Small buffer - proceed with normal coalescing
419 // Example: [300] + [1200] → split and merge normally
420 // This ensures small batches still get properly coalesced
421 // while allowing some controlled growth beyond the limit
422 }
423 }
424
425 let (_schema, arrays, mut num_rows) = batch.into_parts();
426
427 // setup input rows
428 assert_eq!(arrays.len(), self.in_progress_arrays.len());
429 self.in_progress_arrays
430 .iter_mut()
431 .zip(arrays)
432 .for_each(|(in_progress, array)| {
433 in_progress.set_source(Some(array));
434 });
435
436 // If pushing this batch would exceed the target batch size,
437 // finish the current batch and start a new one
438 let mut offset = 0;
439 while num_rows > (self.target_batch_size - self.buffered_rows) {
440 let remaining_rows = self.target_batch_size - self.buffered_rows;
441 debug_assert!(remaining_rows > 0);
442
443 // Copy remaining_rows from each array
444 for in_progress in self.in_progress_arrays.iter_mut() {
445 in_progress.copy_rows(offset, remaining_rows)?;
446 }
447
448 self.buffered_rows += remaining_rows;
449 offset += remaining_rows;
450 num_rows -= remaining_rows;
451
452 self.finish_buffered_batch()?;
453 }
454
455 // Add any the remaining rows to the buffer
456 self.buffered_rows += num_rows;
457 if num_rows > 0 {
458 for in_progress in self.in_progress_arrays.iter_mut() {
459 in_progress.copy_rows(offset, num_rows)?;
460 }
461 }
462
463 // If we have reached the target batch size, finalize the buffered batch
464 if self.buffered_rows >= self.target_batch_size {
465 self.finish_buffered_batch()?;
466 }
467
468 // clear in progress sources (to allow the memory to be freed)
469 for in_progress in self.in_progress_arrays.iter_mut() {
470 in_progress.set_source(None);
471 }
472
473 Ok(())
474 }
475
476 /// Returns the number of buffered rows
477 pub fn get_buffered_rows(&self) -> usize {
478 self.buffered_rows
479 }
480
481 /// Concatenates any buffered batches into a single `RecordBatch` and
482 /// clears any output buffers
483 ///
484 /// Normally this is called when the input stream is exhausted, and
485 /// we want to finalize the last batch of rows.
486 ///
487 /// See [`Self::next_completed_batch()`] for the completed batches.
488 pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
489 if self.buffered_rows == 0 {
490 return Ok(());
491 }
492 let new_arrays = self
493 .in_progress_arrays
494 .iter_mut()
495 .map(|array| array.finish())
496 .collect::<Result<Vec<_>, ArrowError>>()?;
497
498 for (array, field) in new_arrays.iter().zip(self.schema.fields().iter()) {
499 debug_assert_eq!(array.data_type(), field.data_type());
500 debug_assert_eq!(array.len(), self.buffered_rows);
501 }
502
503 // SAFETY: each array was created of the correct type and length.
504 let batch = unsafe {
505 RecordBatch::new_unchecked(Arc::clone(&self.schema), new_arrays, self.buffered_rows)
506 };
507
508 self.buffered_rows = 0;
509 self.completed.push_back(batch);
510 Ok(())
511 }
512
513 /// Returns true if there is any buffered data
514 pub fn is_empty(&self) -> bool {
515 self.buffered_rows == 0 && self.completed.is_empty()
516 }
517
518 /// Returns true if there are any completed batches
519 pub fn has_completed_batch(&self) -> bool {
520 !self.completed.is_empty()
521 }
522
523 /// Removes and returns the next completed batch, if any.
524 pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
525 self.completed.pop_front()
526 }
527}
528
529/// Return a new `InProgressArray` for the given data type
530fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
531 macro_rules! instantiate_primitive {
532 ($t:ty) => {
533 Box::new(InProgressPrimitiveArray::<$t>::new(
534 batch_size,
535 data_type.clone(),
536 ))
537 };
538 }
539
540 downcast_primitive! {
541 // Instantiate InProgressPrimitiveArray for each primitive type
542 data_type => (instantiate_primitive),
543 DataType::Utf8View => Box::new(InProgressByteViewArray::<StringViewType>::new(batch_size)),
544 DataType::BinaryView => {
545 Box::new(InProgressByteViewArray::<BinaryViewType>::new(batch_size))
546 }
547 _ => Box::new(GenericInProgressArray::new()),
548 }
549}
550
551/// Incrementally builds up arrays
552///
553/// [`GenericInProgressArray`] is the default implementation that buffers
554/// arrays and uses other kernels concatenates them when finished.
555///
556/// Some types have specialized implementations for this array types (e.g.,
557/// [`StringViewArray`], etc.).
558///
559/// [`StringViewArray`]: arrow_array::StringViewArray
560trait InProgressArray: std::fmt::Debug + Send + Sync {
561 /// Set the source array.
562 ///
563 /// Calls to [`Self::copy_rows`] will copy rows from this array into the
564 /// current in-progress array
565 fn set_source(&mut self, source: Option<ArrayRef>);
566
567 /// Copy rows from the current source array into the in-progress array
568 ///
569 /// The source array is set by [`Self::set_source`].
570 ///
571 /// Return an error if the source array is not set
572 fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;
573
574 /// Finish the currently in-progress array and return it as an `ArrayRef`
575 fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
576}
577
578#[cfg(test)]
579mod tests {
580 use super::*;
581 use crate::concat::concat_batches;
582 use arrow_array::builder::StringViewBuilder;
583 use arrow_array::cast::AsArray;
584 use arrow_array::{
585 BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray,
586 TimestampNanosecondArray, UInt32Array,
587 };
588 use arrow_schema::{DataType, Field, Schema};
589 use rand::{Rng, SeedableRng};
590 use std::ops::Range;
591
592 #[test]
593 fn test_coalesce() {
594 let batch = uint32_batch(0..8);
595 Test::new()
596 .with_batches(std::iter::repeat_n(batch, 10))
597 // expected output is exactly 21 rows (except for the final batch)
598 .with_batch_size(21)
599 .with_expected_output_sizes(vec![21, 21, 21, 17])
600 .run();
601 }
602
603 #[test]
604 fn test_coalesce_one_by_one() {
605 let batch = uint32_batch(0..1); // single row input
606 Test::new()
607 .with_batches(std::iter::repeat_n(batch, 97))
608 // expected output is exactly 20 rows (except for the final batch)
609 .with_batch_size(20)
610 .with_expected_output_sizes(vec![20, 20, 20, 20, 17])
611 .run();
612 }
613
614 #[test]
615 fn test_coalesce_empty() {
616 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
617
618 Test::new()
619 .with_batches(vec![])
620 .with_schema(schema)
621 .with_batch_size(21)
622 .with_expected_output_sizes(vec![])
623 .run();
624 }
625
626 #[test]
627 fn test_single_large_batch_greater_than_target() {
628 // test a single large batch
629 let batch = uint32_batch(0..4096);
630 Test::new()
631 .with_batch(batch)
632 .with_batch_size(1000)
633 .with_expected_output_sizes(vec![1000, 1000, 1000, 1000, 96])
634 .run();
635 }
636
637 #[test]
638 fn test_single_large_batch_smaller_than_target() {
639 // test a single large batch
640 let batch = uint32_batch(0..4096);
641 Test::new()
642 .with_batch(batch)
643 .with_batch_size(8192)
644 .with_expected_output_sizes(vec![4096])
645 .run();
646 }
647
648 #[test]
649 fn test_single_large_batch_equal_to_target() {
650 // test a single large batch
651 let batch = uint32_batch(0..4096);
652 Test::new()
653 .with_batch(batch)
654 .with_batch_size(4096)
655 .with_expected_output_sizes(vec![4096])
656 .run();
657 }
658
659 #[test]
660 fn test_single_large_batch_equally_divisible_in_target() {
661 // test a single large batch
662 let batch = uint32_batch(0..4096);
663 Test::new()
664 .with_batch(batch)
665 .with_batch_size(1024)
666 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024])
667 .run();
668 }
669
670 #[test]
671 fn test_empty_schema() {
672 let schema = Schema::empty();
673 let batch = RecordBatch::new_empty(schema.into());
674 Test::new()
675 .with_batch(batch)
676 .with_expected_output_sizes(vec![])
677 .run();
678 }
679
680 /// Coalesce multiple batches, 80k rows, with a 0.1% selectivity filter
681 #[test]
682 fn test_coalesce_filtered_001() {
683 let mut filter_builder = RandomFilterBuilder {
684 num_rows: 8000,
685 selectivity: 0.001,
686 seed: 0,
687 };
688
689 // add 10 batches of 8000 rows each
690 // 80k rows, selecting 0.1% means 80 rows
691 // not exactly 80 as the rows are random;
692 let mut test = Test::new();
693 for _ in 0..10 {
694 test = test
695 .with_batch(multi_column_batch(0..8000))
696 .with_filter(filter_builder.next_filter())
697 }
698 test.with_batch_size(15)
699 .with_expected_output_sizes(vec![15, 15, 15, 13])
700 .run();
701 }
702
703 /// Coalesce multiple batches, 80k rows, with a 1% selectivity filter
704 #[test]
705 fn test_coalesce_filtered_01() {
706 let mut filter_builder = RandomFilterBuilder {
707 num_rows: 8000,
708 selectivity: 0.01,
709 seed: 0,
710 };
711
712 // add 10 batches of 8000 rows each
713 // 80k rows, selecting 1% means 800 rows
714 // not exactly 800 as the rows are random;
715 let mut test = Test::new();
716 for _ in 0..10 {
717 test = test
718 .with_batch(multi_column_batch(0..8000))
719 .with_filter(filter_builder.next_filter())
720 }
721 test.with_batch_size(128)
722 .with_expected_output_sizes(vec![128, 128, 128, 128, 128, 128, 15])
723 .run();
724 }
725
726 /// Coalesce multiple batches, 80k rows, with a 10% selectivity filter
727 #[test]
728 fn test_coalesce_filtered_1() {
729 let mut filter_builder = RandomFilterBuilder {
730 num_rows: 8000,
731 selectivity: 0.1,
732 seed: 0,
733 };
734
735 // add 10 batches of 8000 rows each
736 // 80k rows, selecting 10% means 8000 rows
737 // not exactly 800 as the rows are random;
738 let mut test = Test::new();
739 for _ in 0..10 {
740 test = test
741 .with_batch(multi_column_batch(0..8000))
742 .with_filter(filter_builder.next_filter())
743 }
744 test.with_batch_size(1024)
745 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 840])
746 .run();
747 }
748
749 /// Coalesce multiple batches, 8k rows, with a 90% selectivity filter
750 #[test]
751 fn test_coalesce_filtered_90() {
752 let mut filter_builder = RandomFilterBuilder {
753 num_rows: 800,
754 selectivity: 0.90,
755 seed: 0,
756 };
757
758 // add 10 batches of 800 rows each
759 // 8k rows, selecting 99% means 7200 rows
760 // not exactly 7200 as the rows are random;
761 let mut test = Test::new();
762 for _ in 0..10 {
763 test = test
764 .with_batch(multi_column_batch(0..800))
765 .with_filter(filter_builder.next_filter())
766 }
767 test.with_batch_size(1024)
768 .with_expected_output_sizes(vec![1024, 1024, 1024, 1024, 1024, 1024, 1024, 13])
769 .run();
770 }
771
772 #[test]
773 fn test_coalesce_non_null() {
774 Test::new()
775 // 4040 rows of unit32
776 .with_batch(uint32_batch_non_null(0..3000))
777 .with_batch(uint32_batch_non_null(0..1040))
778 .with_batch_size(1024)
779 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
780 .run();
781 }
782 #[test]
783 fn test_utf8_split() {
784 Test::new()
785 // 4040 rows of utf8 strings in total, split into batches of 1024
786 .with_batch(utf8_batch(0..3000))
787 .with_batch(utf8_batch(0..1040))
788 .with_batch_size(1024)
789 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
790 .run();
791 }
792
793 #[test]
794 fn test_string_view_no_views() {
795 let output_batches = Test::new()
796 // both input batches have no views, so no need to compact
797 .with_batch(stringview_batch([Some("foo"), Some("bar")]))
798 .with_batch(stringview_batch([Some("baz"), Some("qux")]))
799 .with_expected_output_sizes(vec![4])
800 .run();
801
802 expect_buffer_layout(
803 col_as_string_view("c0", output_batches.first().unwrap()),
804 vec![],
805 );
806 }
807
808 #[test]
809 fn test_string_view_batch_small_no_compact() {
810 // view with only short strings (no buffers) --> no need to compact
811 let batch = stringview_batch_repeated(1000, [Some("a"), Some("b"), Some("c")]);
812 let output_batches = Test::new()
813 .with_batch(batch.clone())
814 .with_expected_output_sizes(vec![1000])
815 .run();
816
817 let array = col_as_string_view("c0", &batch);
818 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
819 assert_eq!(array.data_buffers().len(), 0);
820 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
821
822 expect_buffer_layout(gc_array, vec![]);
823 }
824
825 #[test]
826 fn test_string_view_batch_large_no_compact() {
827 // view with large strings (has buffers) but full --> no need to compact
828 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
829 let output_batches = Test::new()
830 .with_batch(batch.clone())
831 .with_batch_size(1000)
832 .with_expected_output_sizes(vec![1000])
833 .run();
834
835 let array = col_as_string_view("c0", &batch);
836 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
837 assert_eq!(array.data_buffers().len(), 5);
838 assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); // no compaction
839
840 expect_buffer_layout(
841 gc_array,
842 vec![
843 ExpectedLayout {
844 len: 8190,
845 capacity: 8192,
846 },
847 ExpectedLayout {
848 len: 8190,
849 capacity: 8192,
850 },
851 ExpectedLayout {
852 len: 8190,
853 capacity: 8192,
854 },
855 ExpectedLayout {
856 len: 8190,
857 capacity: 8192,
858 },
859 ExpectedLayout {
860 len: 2240,
861 capacity: 8192,
862 },
863 ],
864 );
865 }
866
867 #[test]
868 fn test_string_view_batch_small_with_buffers_no_compact() {
869 // view with buffers but only short views
870 let short_strings = std::iter::repeat(Some("SmallString"));
871 let long_strings = std::iter::once(Some("This string is longer than 12 bytes"));
872 // 20 short strings, then a long ones
873 let values = short_strings.take(20).chain(long_strings);
874 let batch = stringview_batch_repeated(1000, values)
875 // take only 10 short strings (no long ones)
876 .slice(5, 10);
877 let output_batches = Test::new()
878 .with_batch(batch.clone())
879 .with_batch_size(1000)
880 .with_expected_output_sizes(vec![10])
881 .run();
882
883 let array = col_as_string_view("c0", &batch);
884 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
885 assert_eq!(array.data_buffers().len(), 1); // input has one buffer
886 assert_eq!(gc_array.data_buffers().len(), 0); // output has no buffers as only short strings
887 }
888
889 #[test]
890 fn test_string_view_batch_large_slice_compact() {
891 // view with large strings (has buffers) and only partially used --> no need to compact
892 let batch = stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")])
893 // slice only 22 rows, so most of the buffer is not used
894 .slice(11, 22);
895
896 let output_batches = Test::new()
897 .with_batch(batch.clone())
898 .with_batch_size(1000)
899 .with_expected_output_sizes(vec![22])
900 .run();
901
902 let array = col_as_string_view("c0", &batch);
903 let gc_array = col_as_string_view("c0", output_batches.first().unwrap());
904 assert_eq!(array.data_buffers().len(), 5);
905
906 expect_buffer_layout(
907 gc_array,
908 vec![ExpectedLayout {
909 len: 770,
910 capacity: 8192,
911 }],
912 );
913 }
914
915 #[test]
916 fn test_string_view_mixed() {
917 let large_view_batch =
918 stringview_batch_repeated(1000, [Some("This string is longer than 12 bytes")]);
919 let small_view_batch = stringview_batch_repeated(1000, [Some("SmallString")]);
920 let mixed_batch = stringview_batch_repeated(
921 1000,
922 [Some("This string is longer than 12 bytes"), Some("Small")],
923 );
924 let mixed_batch_nulls = stringview_batch_repeated(
925 1000,
926 [
927 Some("This string is longer than 12 bytes"),
928 Some("Small"),
929 None,
930 ],
931 );
932
933 // Several batches with mixed inline / non inline
934 // 4k rows in
935 let output_batches = Test::new()
936 .with_batch(large_view_batch.clone())
937 .with_batch(small_view_batch)
938 // this batch needs to be compacted (less than 1/2 full)
939 .with_batch(large_view_batch.slice(10, 20))
940 .with_batch(mixed_batch_nulls)
941 // this batch needs to be compacted (less than 1/2 full)
942 .with_batch(large_view_batch.slice(10, 20))
943 .with_batch(mixed_batch)
944 .with_expected_output_sizes(vec![1024, 1024, 1024, 968])
945 .run();
946
947 expect_buffer_layout(
948 col_as_string_view("c0", output_batches.first().unwrap()),
949 vec![
950 ExpectedLayout {
951 len: 8190,
952 capacity: 8192,
953 },
954 ExpectedLayout {
955 len: 8190,
956 capacity: 8192,
957 },
958 ExpectedLayout {
959 len: 8190,
960 capacity: 8192,
961 },
962 ExpectedLayout {
963 len: 8190,
964 capacity: 8192,
965 },
966 ExpectedLayout {
967 len: 2240,
968 capacity: 8192,
969 },
970 ],
971 );
972 }
973
974 #[test]
975 fn test_string_view_many_small_compact() {
976 // 200 rows alternating long (28) and short (≤12) strings.
977 // Only the 100 long strings go into data buffers: 100 × 28 = 2800.
978 let batch = stringview_batch_repeated(
979 200,
980 [Some("This string is 28 bytes long"), Some("small string")],
981 );
982 let output_batches = Test::new()
983 // First allocated buffer is 8kb.
984 // Appending 10 batches of 2800 bytes will use 2800 * 10 = 14kb (8kb, an 16kb and 32kbkb)
985 .with_batch(batch.clone())
986 .with_batch(batch.clone())
987 .with_batch(batch.clone())
988 .with_batch(batch.clone())
989 .with_batch(batch.clone())
990 .with_batch(batch.clone())
991 .with_batch(batch.clone())
992 .with_batch(batch.clone())
993 .with_batch(batch.clone())
994 .with_batch(batch.clone())
995 .with_batch_size(8000)
996 .with_expected_output_sizes(vec![2000]) // only 1000 rows total
997 .run();
998
999 // expect a nice even distribution of buffers
1000 expect_buffer_layout(
1001 col_as_string_view("c0", output_batches.first().unwrap()),
1002 vec![
1003 ExpectedLayout {
1004 len: 8176,
1005 capacity: 8192,
1006 },
1007 ExpectedLayout {
1008 len: 16380,
1009 capacity: 16384,
1010 },
1011 ExpectedLayout {
1012 len: 3444,
1013 capacity: 32768,
1014 },
1015 ],
1016 );
1017 }
1018
1019 #[test]
1020 fn test_string_view_many_small_boundary() {
1021 // The strings are designed to exactly fit into buffers that are powers of 2 long
1022 let batch = stringview_batch_repeated(100, [Some("This string is a power of two=32")]);
1023 let output_batches = Test::new()
1024 .with_batches(std::iter::repeat_n(batch, 20))
1025 .with_batch_size(900)
1026 .with_expected_output_sizes(vec![900, 900, 200])
1027 .run();
1028
1029 // expect each buffer to be entirely full except the last one
1030 expect_buffer_layout(
1031 col_as_string_view("c0", output_batches.first().unwrap()),
1032 vec![
1033 ExpectedLayout {
1034 len: 8192,
1035 capacity: 8192,
1036 },
1037 ExpectedLayout {
1038 len: 16384,
1039 capacity: 16384,
1040 },
1041 ExpectedLayout {
1042 len: 4224,
1043 capacity: 32768,
1044 },
1045 ],
1046 );
1047 }
1048
1049 #[test]
1050 fn test_string_view_large_small() {
1051 // The strings are 37 bytes long, so each batch has 100 * 28 = 2800 bytes
1052 let mixed_batch = stringview_batch_repeated(
1053 200,
1054 [Some("This string is 28 bytes long"), Some("small string")],
1055 );
1056 // These strings aren't copied, this array has an 8k buffer
1057 let all_large = stringview_batch_repeated(
1058 50,
1059 [Some(
1060 "This buffer has only large strings in it so there are no buffer copies",
1061 )],
1062 );
1063
1064 let output_batches = Test::new()
1065 // First allocated buffer is 8kb.
1066 // Appending five batches of 2800 bytes will use 2800 * 10 = 28kb (8kb, an 16kb and 32kbkb)
1067 .with_batch(mixed_batch.clone())
1068 .with_batch(mixed_batch.clone())
1069 .with_batch(all_large.clone())
1070 .with_batch(mixed_batch.clone())
1071 .with_batch(all_large.clone())
1072 .with_batch(mixed_batch.clone())
1073 .with_batch(mixed_batch.clone())
1074 .with_batch(all_large.clone())
1075 .with_batch(mixed_batch.clone())
1076 .with_batch(all_large.clone())
1077 .with_batch_size(8000)
1078 .with_expected_output_sizes(vec![1400])
1079 .run();
1080
1081 expect_buffer_layout(
1082 col_as_string_view("c0", output_batches.first().unwrap()),
1083 vec![
1084 ExpectedLayout {
1085 len: 8190,
1086 capacity: 8192,
1087 },
1088 ExpectedLayout {
1089 len: 16366,
1090 capacity: 16384,
1091 },
1092 ExpectedLayout {
1093 len: 6244,
1094 capacity: 32768,
1095 },
1096 ],
1097 );
1098 }
1099
1100 #[test]
1101 fn test_binary_view() {
1102 let values: Vec<Option<&[u8]>> = vec![
1103 Some(b"foo"),
1104 None,
1105 Some(b"A longer string that is more than 12 bytes"),
1106 ];
1107
1108 let binary_view =
1109 BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
1110 let batch =
1111 RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
1112
1113 Test::new()
1114 .with_batch(batch.clone())
1115 .with_batch(batch.clone())
1116 .with_batch_size(512)
1117 .with_expected_output_sizes(vec![512, 512, 512, 464])
1118 .run();
1119 }
1120
1121 #[derive(Debug, Clone, PartialEq)]
1122 struct ExpectedLayout {
1123 len: usize,
1124 capacity: usize,
1125 }
1126
1127 /// Asserts that the buffer layout of the specified StringViewArray matches the expected layout
1128 fn expect_buffer_layout(array: &StringViewArray, expected: Vec<ExpectedLayout>) {
1129 let actual = array
1130 .data_buffers()
1131 .iter()
1132 .map(|b| ExpectedLayout {
1133 len: b.len(),
1134 capacity: b.capacity(),
1135 })
1136 .collect::<Vec<_>>();
1137
1138 assert_eq!(
1139 actual, expected,
1140 "Expected buffer layout {expected:#?} but got {actual:#?}"
1141 );
1142 }
1143
1144 /// Test for [`BatchCoalescer`]
1145 ///
1146 /// Pushes the input batches to the coalescer and verifies that the resulting
1147 /// batches have the expected number of rows and contents.
1148 #[derive(Debug, Clone)]
1149 struct Test {
1150 /// Batches to feed to the coalescer.
1151 input_batches: Vec<RecordBatch>,
1152 /// Filters to apply to the corresponding input batches.
1153 ///
1154 /// If there are no filters for the input batches, the batch will be
1155 /// pushed as is.
1156 filters: Vec<BooleanArray>,
1157 /// The schema. If not provided, the first batch's schema is used.
1158 schema: Option<SchemaRef>,
1159 /// Expected output sizes of the resulting batches
1160 expected_output_sizes: Vec<usize>,
1161 /// target batch size (default to 1024)
1162 target_batch_size: usize,
1163 }
1164
1165 impl Default for Test {
1166 fn default() -> Self {
1167 Self {
1168 input_batches: vec![],
1169 filters: vec![],
1170 schema: None,
1171 expected_output_sizes: vec![],
1172 target_batch_size: 1024,
1173 }
1174 }
1175 }
1176
1177 impl Test {
1178 fn new() -> Self {
1179 Self::default()
1180 }
1181
1182 /// Set the target batch size
1183 fn with_batch_size(mut self, target_batch_size: usize) -> Self {
1184 self.target_batch_size = target_batch_size;
1185 self
1186 }
1187
1188 /// Extend the input batches with `batch`
1189 fn with_batch(mut self, batch: RecordBatch) -> Self {
1190 self.input_batches.push(batch);
1191 self
1192 }
1193
1194 /// Extend the filters with `filter`
1195 fn with_filter(mut self, filter: BooleanArray) -> Self {
1196 self.filters.push(filter);
1197 self
1198 }
1199
1200 /// Extends the input batches with `batches`
1201 fn with_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
1202 self.input_batches.extend(batches);
1203 self
1204 }
1205
1206 /// Specifies the schema for the test
1207 fn with_schema(mut self, schema: SchemaRef) -> Self {
1208 self.schema = Some(schema);
1209 self
1210 }
1211
1212 /// Extends `sizes` to expected output sizes
1213 fn with_expected_output_sizes(mut self, sizes: impl IntoIterator<Item = usize>) -> Self {
1214 self.expected_output_sizes.extend(sizes);
1215 self
1216 }
1217
1218 /// Runs the test -- see documentation on [`Test`] for details
1219 ///
1220 /// Returns the resulting output batches
1221 fn run(self) -> Vec<RecordBatch> {
1222 let expected_output = self.expected_output();
1223 let schema = self.schema();
1224
1225 let Self {
1226 input_batches,
1227 filters,
1228 schema: _,
1229 target_batch_size,
1230 expected_output_sizes,
1231 } = self;
1232
1233 let had_input = input_batches.iter().any(|b| b.num_rows() > 0);
1234
1235 let mut coalescer = BatchCoalescer::new(Arc::clone(&schema), target_batch_size);
1236
1237 // feed input batches and filters to the coalescer
1238 let mut filters = filters.into_iter();
1239 for batch in input_batches {
1240 if let Some(filter) = filters.next() {
1241 coalescer.push_batch_with_filter(batch, &filter).unwrap();
1242 } else {
1243 coalescer.push_batch(batch).unwrap();
1244 }
1245 }
1246 assert_eq!(schema, coalescer.schema());
1247
1248 if had_input {
1249 assert!(!coalescer.is_empty(), "Coalescer should not be empty");
1250 } else {
1251 assert!(coalescer.is_empty(), "Coalescer should be empty");
1252 }
1253
1254 coalescer.finish_buffered_batch().unwrap();
1255 if had_input {
1256 assert!(
1257 coalescer.has_completed_batch(),
1258 "Coalescer should have completed batches"
1259 );
1260 }
1261
1262 let mut output_batches = vec![];
1263 while let Some(batch) = coalescer.next_completed_batch() {
1264 output_batches.push(batch);
1265 }
1266
1267 // make sure we got the expected number of output batches and content
1268 let mut starting_idx = 0;
1269 let actual_output_sizes: Vec<usize> =
1270 output_batches.iter().map(|b| b.num_rows()).collect();
1271 assert_eq!(
1272 expected_output_sizes, actual_output_sizes,
1273 "Unexpected number of rows in output batches\n\
1274 Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
1275 );
1276 let iter = expected_output_sizes
1277 .iter()
1278 .zip(output_batches.iter())
1279 .enumerate();
1280
1281 for (i, (expected_size, batch)) in iter {
1282 // compare the contents of the batch after normalization (using
1283 // `==` compares the underlying memory layout too)
1284 let expected_batch = expected_output.slice(starting_idx, *expected_size);
1285 let expected_batch = normalize_batch(expected_batch);
1286 let batch = normalize_batch(batch.clone());
1287 assert_eq!(
1288 expected_batch, batch,
1289 "Unexpected content in batch {i}:\
1290 \n\nExpected:\n{expected_batch:#?}\n\nActual:\n{batch:#?}"
1291 );
1292 starting_idx += *expected_size;
1293 }
1294 output_batches
1295 }
1296
1297 /// Return the expected output schema. If not overridden by `with_schema`, it
1298 /// returns the schema of the first input batch.
1299 fn schema(&self) -> SchemaRef {
1300 self.schema
1301 .clone()
1302 .unwrap_or_else(|| Arc::clone(&self.input_batches[0].schema()))
1303 }
1304
1305 /// Returns the expected output as a single `RecordBatch`
1306 fn expected_output(&self) -> RecordBatch {
1307 let schema = self.schema();
1308 if self.filters.is_empty() {
1309 return concat_batches(&schema, &self.input_batches).unwrap();
1310 }
1311
1312 let mut filters = self.filters.iter();
1313 let filtered_batches = self
1314 .input_batches
1315 .iter()
1316 .map(|batch| {
1317 if let Some(filter) = filters.next() {
1318 filter_record_batch(batch, filter).unwrap()
1319 } else {
1320 batch.clone()
1321 }
1322 })
1323 .collect::<Vec<_>>();
1324 concat_batches(&schema, &filtered_batches).unwrap()
1325 }
1326 }
1327
1328 /// Return a RecordBatch with a UInt32Array with the specified range and
1329 /// every third value is null.
1330 fn uint32_batch(range: Range<u32>) -> RecordBatch {
1331 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, true)]));
1332
1333 let array = UInt32Array::from_iter(range.map(|i| if i % 3 == 0 { None } else { Some(i) }));
1334 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1335 }
1336
1337 /// Return a RecordBatch with a UInt32Array with no nulls specified range
1338 fn uint32_batch_non_null(range: Range<u32>) -> RecordBatch {
1339 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]));
1340
1341 let array = UInt32Array::from_iter_values(range);
1342 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1343 }
1344
1345 /// Return a RecordBatch with a StringArrary with values `value0`, `value1`, ...
1346 /// and every third value is `None`.
1347 fn utf8_batch(range: Range<u32>) -> RecordBatch {
1348 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Utf8, true)]));
1349
1350 let array = StringArray::from_iter(range.map(|i| {
1351 if i % 3 == 0 {
1352 None
1353 } else {
1354 Some(format!("value{i}"))
1355 }
1356 }));
1357
1358 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1359 }
1360
1361 /// Return a RecordBatch with a StringViewArray with (only) the specified values
1362 fn stringview_batch<'a>(values: impl IntoIterator<Item = Option<&'a str>>) -> RecordBatch {
1363 let schema = Arc::new(Schema::new(vec![Field::new(
1364 "c0",
1365 DataType::Utf8View,
1366 false,
1367 )]));
1368
1369 let array = StringViewArray::from_iter(values);
1370 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1371 }
1372
1373 /// Return a RecordBatch with a StringViewArray with num_rows by repeating
1374 /// values over and over.
1375 fn stringview_batch_repeated<'a>(
1376 num_rows: usize,
1377 values: impl IntoIterator<Item = Option<&'a str>>,
1378 ) -> RecordBatch {
1379 let schema = Arc::new(Schema::new(vec![Field::new(
1380 "c0",
1381 DataType::Utf8View,
1382 true,
1383 )]));
1384
1385 // Repeat the values to a total of num_rows
1386 let values: Vec<_> = values.into_iter().collect();
1387 let values_iter = std::iter::repeat(values.iter())
1388 .flatten()
1389 .cloned()
1390 .take(num_rows);
1391
1392 let mut builder = StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
1393 for val in values_iter {
1394 builder.append_option(val);
1395 }
1396
1397 let array = builder.finish();
1398 RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]).unwrap()
1399 }
1400
1401 /// Return a RecordBatch of 100 rows
1402 fn multi_column_batch(range: Range<i32>) -> RecordBatch {
1403 let int64_array = Int64Array::from_iter(range.clone().map(|v| {
1404 if v % 5 == 0 {
1405 None
1406 } else {
1407 Some(v as i64)
1408 }
1409 }));
1410 let string_view_array = StringViewArray::from_iter(range.clone().map(|v| {
1411 if v % 5 == 0 {
1412 None
1413 } else if v % 7 == 0 {
1414 Some(format!("This is a string longer than 12 bytes{v}"))
1415 } else {
1416 Some(format!("Short {v}"))
1417 }
1418 }));
1419 let string_array = StringArray::from_iter(range.clone().map(|v| {
1420 if v % 11 == 0 {
1421 None
1422 } else {
1423 Some(format!("Value {v}"))
1424 }
1425 }));
1426 let timestamp_array = TimestampNanosecondArray::from_iter(range.map(|v| {
1427 if v % 3 == 0 {
1428 None
1429 } else {
1430 Some(v as i64 * 1000) // simulate a timestamp in milliseconds
1431 }
1432 }))
1433 .with_timezone("America/New_York");
1434
1435 RecordBatch::try_from_iter(vec![
1436 ("int64", Arc::new(int64_array) as ArrayRef),
1437 ("stringview", Arc::new(string_view_array) as ArrayRef),
1438 ("string", Arc::new(string_array) as ArrayRef),
1439 ("timestamp", Arc::new(timestamp_array) as ArrayRef),
1440 ])
1441 .unwrap()
1442 }
1443
1444 /// Return a boolean array that filters out randomly selected rows
1445 /// from the input batch with a `selectivity`.
1446 ///
1447 /// For example a `selectivity` of 0.1 will filter out
1448 /// 90% of the rows.
1449 #[derive(Debug)]
1450 struct RandomFilterBuilder {
1451 num_rows: usize,
1452 selectivity: f64,
1453 /// seed for random number generator, increases by one each time
1454 /// `next_filter` is called
1455 seed: u64,
1456 }
1457 impl RandomFilterBuilder {
1458 /// Build the next filter with the current seed and increment the seed
1459 /// by one.
1460 fn next_filter(&mut self) -> BooleanArray {
1461 assert!(self.selectivity >= 0.0 && self.selectivity <= 1.0);
1462 let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed);
1463 self.seed += 1;
1464 BooleanArray::from_iter(
1465 (0..self.num_rows)
1466 .map(|_| rng.random_bool(self.selectivity))
1467 .map(Some),
1468 )
1469 }
1470 }
1471
1472 /// Returns the named column as a StringViewArray
1473 fn col_as_string_view<'b>(name: &str, batch: &'b RecordBatch) -> &'b StringViewArray {
1474 batch
1475 .column_by_name(name)
1476 .expect("column not found")
1477 .as_string_view_opt()
1478 .expect("column is not a string view")
1479 }
1480
1481 /// Normalize the `RecordBatch` so that the memory layout is consistent
1482 /// (e.g. StringArray is compacted).
1483 fn normalize_batch(batch: RecordBatch) -> RecordBatch {
1484 // Only need to normalize StringViews (as == also tests for memory layout)
1485 let (schema, mut columns, row_count) = batch.into_parts();
1486
1487 for column in columns.iter_mut() {
1488 let Some(string_view) = column.as_string_view_opt() else {
1489 continue;
1490 };
1491
1492 // Re-create the StringViewArray to ensure memory layout is
1493 // consistent
1494 let mut builder = StringViewBuilder::new();
1495 for s in string_view.iter() {
1496 builder.append_option(s);
1497 }
1498 // Update the column with the new StringViewArray
1499 *column = Arc::new(builder.finish());
1500 }
1501
1502 let options = RecordBatchOptions::new().with_row_count(Some(row_count));
1503 RecordBatch::try_new_with_options(schema, columns, &options).unwrap()
1504 }
1505
1506 /// Helper function to create a test batch with specified number of rows
1507 fn create_test_batch(num_rows: usize) -> RecordBatch {
1508 let schema = Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)]));
1509 let array = Int32Array::from_iter_values(0..num_rows as i32);
1510 RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
1511 }
1512 #[test]
1513 fn test_biggest_coalesce_batch_size_none_default() {
1514 // Test that default behavior (None) coalesces all batches
1515 let mut coalescer = BatchCoalescer::new(
1516 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1517 100,
1518 );
1519
1520 // Push a large batch (1000 rows) - should be coalesced normally
1521 let large_batch = create_test_batch(1000);
1522 coalescer.push_batch(large_batch).unwrap();
1523
1524 // Should produce multiple batches of target size (100)
1525 let mut output_batches = vec![];
1526 while let Some(batch) = coalescer.next_completed_batch() {
1527 output_batches.push(batch);
1528 }
1529
1530 coalescer.finish_buffered_batch().unwrap();
1531 while let Some(batch) = coalescer.next_completed_batch() {
1532 output_batches.push(batch);
1533 }
1534
1535 // Should have 10 batches of 100 rows each
1536 assert_eq!(output_batches.len(), 10);
1537 for batch in output_batches {
1538 assert_eq!(batch.num_rows(), 100);
1539 }
1540 }
1541
1542 #[test]
1543 fn test_biggest_coalesce_batch_size_bypass_large_batch() {
1544 // Test that batches larger than biggest_coalesce_batch_size bypass coalescing
1545 let mut coalescer = BatchCoalescer::new(
1546 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1547 100,
1548 );
1549 coalescer.set_biggest_coalesce_batch_size(Some(500));
1550
1551 // Push a large batch (1000 rows) - should bypass coalescing
1552 let large_batch = create_test_batch(1000);
1553 coalescer.push_batch(large_batch.clone()).unwrap();
1554
1555 // Should have one completed batch immediately (the original large batch)
1556 assert!(coalescer.has_completed_batch());
1557 let output_batch = coalescer.next_completed_batch().unwrap();
1558 assert_eq!(output_batch.num_rows(), 1000);
1559
1560 // Should be no more completed batches
1561 assert!(!coalescer.has_completed_batch());
1562 assert_eq!(coalescer.get_buffered_rows(), 0);
1563 }
1564
1565 #[test]
1566 fn test_biggest_coalesce_batch_size_coalesce_small_batch() {
1567 // Test that batches smaller than biggest_coalesce_batch_size are coalesced normally
1568 let mut coalescer = BatchCoalescer::new(
1569 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1570 100,
1571 );
1572 coalescer.set_biggest_coalesce_batch_size(Some(500));
1573
1574 // Push small batches that should be coalesced
1575 let small_batch = create_test_batch(50);
1576 coalescer.push_batch(small_batch.clone()).unwrap();
1577
1578 // Should not have completed batch yet (only 50 rows, target is 100)
1579 assert!(!coalescer.has_completed_batch());
1580 assert_eq!(coalescer.get_buffered_rows(), 50);
1581
1582 // Push another small batch
1583 coalescer.push_batch(small_batch).unwrap();
1584
1585 // Now should have a completed batch (100 rows total)
1586 assert!(coalescer.has_completed_batch());
1587 let output_batch = coalescer.next_completed_batch().unwrap();
1588 assert_eq!(output_batch.num_rows(), 100);
1589
1590 assert_eq!(coalescer.get_buffered_rows(), 0);
1591 }
1592
1593 #[test]
1594 fn test_biggest_coalesce_batch_size_equal_boundary() {
1595 // Test behavior when batch size equals biggest_coalesce_batch_size
1596 let mut coalescer = BatchCoalescer::new(
1597 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1598 100,
1599 );
1600 coalescer.set_biggest_coalesce_batch_size(Some(500));
1601
1602 // Push a batch exactly equal to the limit
1603 let boundary_batch = create_test_batch(500);
1604 coalescer.push_batch(boundary_batch).unwrap();
1605
1606 // Should be coalesced (not bypass) since it's equal, not greater
1607 let mut output_count = 0;
1608 while coalescer.next_completed_batch().is_some() {
1609 output_count += 1;
1610 }
1611
1612 coalescer.finish_buffered_batch().unwrap();
1613 while coalescer.next_completed_batch().is_some() {
1614 output_count += 1;
1615 }
1616
1617 // Should have 5 batches of 100 rows each
1618 assert_eq!(output_count, 5);
1619 }
1620
1621 #[test]
1622 fn test_biggest_coalesce_batch_size_first_large_then_consecutive_bypass() {
1623 // Test the new consecutive large batch bypass behavior
1624 // Pattern: small batches -> first large batch (coalesced) -> consecutive large batches (bypass)
1625 let mut coalescer = BatchCoalescer::new(
1626 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1627 100,
1628 );
1629 coalescer.set_biggest_coalesce_batch_size(Some(200));
1630
1631 let small_batch = create_test_batch(50);
1632
1633 // Push small batch first to create buffered data
1634 coalescer.push_batch(small_batch).unwrap();
1635 assert_eq!(coalescer.get_buffered_rows(), 50);
1636 assert!(!coalescer.has_completed_batch());
1637
1638 // Push first large batch - should go through normal coalescing due to buffered data
1639 let large_batch1 = create_test_batch(250);
1640 coalescer.push_batch(large_batch1).unwrap();
1641
1642 // 50 + 250 = 300 -> 3 complete batches of 100, 0 rows buffered
1643 let mut completed_batches = vec![];
1644 while let Some(batch) = coalescer.next_completed_batch() {
1645 completed_batches.push(batch);
1646 }
1647 assert_eq!(completed_batches.len(), 3);
1648 assert_eq!(coalescer.get_buffered_rows(), 0);
1649
1650 // Now push consecutive large batches - they should bypass
1651 let large_batch2 = create_test_batch(300);
1652 let large_batch3 = create_test_batch(400);
1653
1654 // Push second large batch - should bypass since it's consecutive and buffer is empty
1655 coalescer.push_batch(large_batch2).unwrap();
1656 assert!(coalescer.has_completed_batch());
1657 let output = coalescer.next_completed_batch().unwrap();
1658 assert_eq!(output.num_rows(), 300); // bypassed with original size
1659 assert_eq!(coalescer.get_buffered_rows(), 0);
1660
1661 // Push third large batch - should also bypass
1662 coalescer.push_batch(large_batch3).unwrap();
1663 assert!(coalescer.has_completed_batch());
1664 let output = coalescer.next_completed_batch().unwrap();
1665 assert_eq!(output.num_rows(), 400); // bypassed with original size
1666 assert_eq!(coalescer.get_buffered_rows(), 0);
1667 }
1668
1669 #[test]
1670 fn test_biggest_coalesce_batch_size_empty_batch() {
1671 // Test that empty batches don't trigger the bypass logic
1672 let mut coalescer = BatchCoalescer::new(
1673 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1674 100,
1675 );
1676 coalescer.set_biggest_coalesce_batch_size(Some(50));
1677
1678 let empty_batch = create_test_batch(0);
1679 coalescer.push_batch(empty_batch).unwrap();
1680
1681 // Empty batch should be handled normally (no effect)
1682 assert!(!coalescer.has_completed_batch());
1683 assert_eq!(coalescer.get_buffered_rows(), 0);
1684 }
1685
1686 #[test]
1687 fn test_biggest_coalesce_batch_size_with_buffered_data_no_bypass() {
1688 // Test that when there is buffered data, large batches do NOT bypass (unless consecutive)
1689 let mut coalescer = BatchCoalescer::new(
1690 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1691 100,
1692 );
1693 coalescer.set_biggest_coalesce_batch_size(Some(200));
1694
1695 // Add some buffered data first
1696 let small_batch = create_test_batch(30);
1697 coalescer.push_batch(small_batch.clone()).unwrap();
1698 coalescer.push_batch(small_batch).unwrap();
1699 assert_eq!(coalescer.get_buffered_rows(), 60);
1700
1701 // Push large batch that would normally bypass, but shouldn't because buffered_rows > 0
1702 let large_batch = create_test_batch(250);
1703 coalescer.push_batch(large_batch).unwrap();
1704
1705 // The large batch should be processed through normal coalescing logic
1706 // Total: 60 (buffered) + 250 (new) = 310 rows
1707 // Output: 3 complete batches of 100 rows each, 10 rows remain buffered
1708
1709 let mut completed_batches = vec![];
1710 while let Some(batch) = coalescer.next_completed_batch() {
1711 completed_batches.push(batch);
1712 }
1713
1714 assert_eq!(completed_batches.len(), 3);
1715 for batch in &completed_batches {
1716 assert_eq!(batch.num_rows(), 100);
1717 }
1718 assert_eq!(coalescer.get_buffered_rows(), 10);
1719 }
1720
1721 #[test]
1722 fn test_biggest_coalesce_batch_size_zero_limit() {
1723 // Test edge case where limit is 0 (all batches bypass when no buffered data)
1724 let mut coalescer = BatchCoalescer::new(
1725 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1726 100,
1727 );
1728 coalescer.set_biggest_coalesce_batch_size(Some(0));
1729
1730 // Even a 1-row batch should bypass when there's no buffered data
1731 let tiny_batch = create_test_batch(1);
1732 coalescer.push_batch(tiny_batch).unwrap();
1733
1734 assert!(coalescer.has_completed_batch());
1735 let output = coalescer.next_completed_batch().unwrap();
1736 assert_eq!(output.num_rows(), 1);
1737 }
1738
1739 #[test]
1740 fn test_biggest_coalesce_batch_size_bypass_only_when_no_buffer() {
1741 // Test that bypass only occurs when buffered_rows == 0
1742 let mut coalescer = BatchCoalescer::new(
1743 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1744 100,
1745 );
1746 coalescer.set_biggest_coalesce_batch_size(Some(200));
1747
1748 // First, push a large batch with no buffered data - should bypass
1749 let large_batch = create_test_batch(300);
1750 coalescer.push_batch(large_batch.clone()).unwrap();
1751
1752 assert!(coalescer.has_completed_batch());
1753 let output = coalescer.next_completed_batch().unwrap();
1754 assert_eq!(output.num_rows(), 300); // bypassed
1755 assert_eq!(coalescer.get_buffered_rows(), 0);
1756
1757 // Now add some buffered data
1758 let small_batch = create_test_batch(50);
1759 coalescer.push_batch(small_batch).unwrap();
1760 assert_eq!(coalescer.get_buffered_rows(), 50);
1761
1762 // Push the same large batch again - should NOT bypass this time (not consecutive)
1763 coalescer.push_batch(large_batch).unwrap();
1764
1765 // Should process through normal coalescing: 50 + 300 = 350 rows
1766 // Output: 3 complete batches of 100 rows, 50 rows buffered
1767 let mut completed_batches = vec![];
1768 while let Some(batch) = coalescer.next_completed_batch() {
1769 completed_batches.push(batch);
1770 }
1771
1772 assert_eq!(completed_batches.len(), 3);
1773 for batch in &completed_batches {
1774 assert_eq!(batch.num_rows(), 100);
1775 }
1776 assert_eq!(coalescer.get_buffered_rows(), 50);
1777 }
1778
1779 #[test]
1780 fn test_biggest_coalesce_batch_size_consecutive_large_batches_scenario() {
1781 // Test your exact scenario: 20, 20, 30, 700, 600, 700, 900, 700, 600
1782 let mut coalescer = BatchCoalescer::new(
1783 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1784 1000,
1785 );
1786 coalescer.set_biggest_coalesce_batch_size(Some(500));
1787
1788 // Push small batches first
1789 coalescer.push_batch(create_test_batch(20)).unwrap();
1790 coalescer.push_batch(create_test_batch(20)).unwrap();
1791 coalescer.push_batch(create_test_batch(30)).unwrap();
1792
1793 assert_eq!(coalescer.get_buffered_rows(), 70);
1794 assert!(!coalescer.has_completed_batch());
1795
1796 // Push first large batch (700) - should coalesce due to buffered data
1797 coalescer.push_batch(create_test_batch(700)).unwrap();
1798
1799 // 70 + 700 = 770 rows, not enough for 1000, so all stay buffered
1800 assert_eq!(coalescer.get_buffered_rows(), 770);
1801 assert!(!coalescer.has_completed_batch());
1802
1803 // Push second large batch (600) - should bypass since previous was large
1804 coalescer.push_batch(create_test_batch(600)).unwrap();
1805
1806 // Should flush buffer (770 rows) and bypass the 600
1807 let mut outputs = vec![];
1808 while let Some(batch) = coalescer.next_completed_batch() {
1809 outputs.push(batch);
1810 }
1811 assert_eq!(outputs.len(), 2); // one flushed buffer batch (770) + one bypassed (600)
1812 assert_eq!(outputs[0].num_rows(), 770);
1813 assert_eq!(outputs[1].num_rows(), 600);
1814 assert_eq!(coalescer.get_buffered_rows(), 0);
1815
1816 // Push remaining large batches - should all bypass
1817 let remaining_batches = [700, 900, 700, 600];
1818 for &size in &remaining_batches {
1819 coalescer.push_batch(create_test_batch(size)).unwrap();
1820
1821 assert!(coalescer.has_completed_batch());
1822 let output = coalescer.next_completed_batch().unwrap();
1823 assert_eq!(output.num_rows(), size);
1824 assert_eq!(coalescer.get_buffered_rows(), 0);
1825 }
1826 }
1827
1828 #[test]
1829 fn test_biggest_coalesce_batch_size_truly_consecutive_large_bypass() {
1830 // Test truly consecutive large batches that should all bypass
1831 // This test ensures buffer is completely empty between large batches
1832 let mut coalescer = BatchCoalescer::new(
1833 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1834 100,
1835 );
1836 coalescer.set_biggest_coalesce_batch_size(Some(200));
1837
1838 // Push consecutive large batches with no prior buffered data
1839 let large_batches = vec![
1840 create_test_batch(300),
1841 create_test_batch(400),
1842 create_test_batch(350),
1843 create_test_batch(500),
1844 ];
1845
1846 let mut all_outputs = vec![];
1847
1848 for (i, large_batch) in large_batches.into_iter().enumerate() {
1849 let expected_size = large_batch.num_rows();
1850
1851 // Buffer should be empty before each large batch
1852 assert_eq!(
1853 coalescer.get_buffered_rows(),
1854 0,
1855 "Buffer should be empty before batch {}",
1856 i
1857 );
1858
1859 coalescer.push_batch(large_batch).unwrap();
1860
1861 // Each large batch should bypass and produce exactly one output batch
1862 assert!(
1863 coalescer.has_completed_batch(),
1864 "Should have completed batch after pushing batch {}",
1865 i
1866 );
1867
1868 let output = coalescer.next_completed_batch().unwrap();
1869 assert_eq!(
1870 output.num_rows(),
1871 expected_size,
1872 "Batch {} should have bypassed with original size",
1873 i
1874 );
1875
1876 // Should be no more batches and buffer should be empty
1877 assert!(
1878 !coalescer.has_completed_batch(),
1879 "Should have no more completed batches after batch {}",
1880 i
1881 );
1882 assert_eq!(
1883 coalescer.get_buffered_rows(),
1884 0,
1885 "Buffer should be empty after batch {}",
1886 i
1887 );
1888
1889 all_outputs.push(output);
1890 }
1891
1892 // Verify we got exactly 4 output batches with original sizes
1893 assert_eq!(all_outputs.len(), 4);
1894 assert_eq!(all_outputs[0].num_rows(), 300);
1895 assert_eq!(all_outputs[1].num_rows(), 400);
1896 assert_eq!(all_outputs[2].num_rows(), 350);
1897 assert_eq!(all_outputs[3].num_rows(), 500);
1898 }
1899
1900 #[test]
1901 fn test_biggest_coalesce_batch_size_reset_consecutive_on_small_batch() {
1902 // Test that small batches reset the consecutive large batch tracking
1903 let mut coalescer = BatchCoalescer::new(
1904 Arc::new(Schema::new(vec![Field::new("c0", DataType::Int32, false)])),
1905 100,
1906 );
1907 coalescer.set_biggest_coalesce_batch_size(Some(200));
1908
1909 // Push first large batch - should bypass (no buffered data)
1910 coalescer.push_batch(create_test_batch(300)).unwrap();
1911 let output = coalescer.next_completed_batch().unwrap();
1912 assert_eq!(output.num_rows(), 300);
1913
1914 // Push second large batch - should bypass (consecutive)
1915 coalescer.push_batch(create_test_batch(400)).unwrap();
1916 let output = coalescer.next_completed_batch().unwrap();
1917 assert_eq!(output.num_rows(), 400);
1918
1919 // Push small batch - resets consecutive tracking
1920 coalescer.push_batch(create_test_batch(50)).unwrap();
1921 assert_eq!(coalescer.get_buffered_rows(), 50);
1922
1923 // Push large batch again - should NOT bypass due to buffered data
1924 coalescer.push_batch(create_test_batch(350)).unwrap();
1925
1926 // Should coalesce: 50 + 350 = 400 -> 4 complete batches of 100
1927 let mut outputs = vec![];
1928 while let Some(batch) = coalescer.next_completed_batch() {
1929 outputs.push(batch);
1930 }
1931 assert_eq!(outputs.len(), 4);
1932 for batch in outputs {
1933 assert_eq!(batch.num_rows(), 100);
1934 }
1935 assert_eq!(coalescer.get_buffered_rows(), 0);
1936 }
1937}