pub struct BatchCoalescer { /* private fields */ }
Expand description
Concatenate multiple RecordBatch
es
Implements the common pattern of incrementally creating output
RecordBatch
es of a specific size from an input stream of
RecordBatch
es.
This is useful after operations such as filter
and take
that produce
smaller batches, and we want to coalesce them into larger batches for
further processing.
See: https://github.com/apache/arrow-rs/issues/6692
§Example
use arrow_array::record_batch;
use arrow_select::coalesce::{BatchCoalescer};
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
// Create a `BatchCoalescer` that will produce batches with at least 4 rows
let target_batch_size = 4;
let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
// push the batches
coalescer.push_batch(batch1).unwrap();
// only pushed 3 rows (not yet 4, enough to produce a batch)
assert!(coalescer.next_completed_batch().is_none());
coalescer.push_batch(batch2).unwrap();
// now we have 5 rows, so we can produce a batch
let finished = coalescer.next_completed_batch().unwrap();
// 4 rows came out (target batch size is 4)
let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
assert_eq!(finished, expected);
// Have no more input, but still have an in-progress batch
assert!(coalescer.next_completed_batch().is_none());
// We can finish the batch, which will produce the remaining rows
coalescer.finish_buffered_batch().unwrap();
let expected = record_batch!(("a", Int32, [5])).unwrap();
assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
// The coalescer is now empty
assert!(coalescer.next_completed_batch().is_none());
§Background
Generally speaking, larger RecordBatch
es are more efficient to process
than smaller RecordBatch
es (until the CPU cache is exceeded) because
there is fixed processing overhead per batch. This coalescer builds up these
larger batches incrementally.
┌────────────────────┐
│ RecordBatch │
│ num_rows = 100 │
└────────────────────┘ ┌────────────────────┐
│ │
┌────────────────────┐ Coalesce │ │
│ │ Batches │ │
│ RecordBatch │ │ │
│ num_rows = 200 │ ─ ─ ─ ─ ─ ─ ▶ │ │
│ │ │ RecordBatch │
│ │ │ num_rows = 400 │
└────────────────────┘ │ │
│ │
┌────────────────────┐ │ │
│ │ │ │
│ RecordBatch │ │ │
│ num_rows = 100 │ └────────────────────┘
│ │
└────────────────────┘
§Notes:
-
Output rows are produced in the same order as the input rows
-
The output is a sequence of batches, with all but the last being at exactly
target_batch_size
rows.
Implementations§
Source§impl BatchCoalescer
impl BatchCoalescer
Sourcepub fn new(schema: Arc<Schema>, batch_size: usize) -> BatchCoalescer
pub fn new(schema: Arc<Schema>, batch_size: usize) -> BatchCoalescer
Create a new BatchCoalescer
§Arguments
schema
- the schema of the output batchesbatch_size
- the number of rows in each output batch. Typical values are4096
or8192
rows.
Sourcepub fn push_batch_with_filter(
&mut self,
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError>
pub fn push_batch_with_filter( &mut self, batch: RecordBatch, filter: &BooleanArray, ) -> Result<(), ArrowError>
Push a batch into the Coalescer after applying a filter
This is semantically equivalent of calling Self::push_batch
with the results from filter_record_batch
§Example
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
// Apply a filter to each batch to pick the first and last row
let filter = BooleanArray::from(vec![true, false, true]);
// create a new Coalescer that targets creating 1000 row batches
let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
coalescer.push_batch_with_filter(batch1, &filter);
coalescer.push_batch_with_filter(batch2, &filter);
// finsh and retrieve the created batch
coalescer.finish_buffered_batch().unwrap();
let completed_batch = coalescer.next_completed_batch().unwrap();
// filtered out 2 and 5:
let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
assert_eq!(completed_batch, expected_batch);
Sourcepub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>
Push all the rows from batch
into the Coalescer
See Self::next_completed_batch()
to retrieve any completed batches.
§Example
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
// create a new Coalescer that targets creating 1000 row batches
let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
coalescer.push_batch(batch1);
coalescer.push_batch(batch2);
// finsh and retrieve the created batch
coalescer.finish_buffered_batch().unwrap();
let completed_batch = coalescer.next_completed_batch().unwrap();
let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
assert_eq!(completed_batch, expected_batch);
Sourcepub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError>
pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError>
Concatenates any buffered batches into a single RecordBatch
and
clears any output buffers
Normally this is called when the input stream is exhausted, and we want to finalize the last batch of rows.
See Self::next_completed_batch()
for the completed batches.
Sourcepub fn has_completed_batch(&self) -> bool
pub fn has_completed_batch(&self) -> bool
Returns true if there are any completed batches
Sourcepub fn next_completed_batch(&mut self) -> Option<RecordBatch>
pub fn next_completed_batch(&mut self) -> Option<RecordBatch>
Returns the next completed batch, if any
Trait Implementations§
Auto Trait Implementations§
impl Freeze for BatchCoalescer
impl !RefUnwindSafe for BatchCoalescer
impl Send for BatchCoalescer
impl Sync for BatchCoalescer
impl Unpin for BatchCoalescer
impl !UnwindSafe for BatchCoalescer
Blanket Implementations§
Source§impl<T> AlignerFor<1> for T
impl<T> AlignerFor<1> for T
Source§impl<T> AlignerFor<1024> for T
impl<T> AlignerFor<1024> for T
Source§type Aligner = AlignTo1024<T>
type Aligner = AlignTo1024<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<128> for T
impl<T> AlignerFor<128> for T
Source§type Aligner = AlignTo128<T>
type Aligner = AlignTo128<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<16> for T
impl<T> AlignerFor<16> for T
Source§impl<T> AlignerFor<16384> for T
impl<T> AlignerFor<16384> for T
Source§type Aligner = AlignTo16384<T>
type Aligner = AlignTo16384<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<2> for T
impl<T> AlignerFor<2> for T
Source§impl<T> AlignerFor<2048> for T
impl<T> AlignerFor<2048> for T
Source§type Aligner = AlignTo2048<T>
type Aligner = AlignTo2048<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<256> for T
impl<T> AlignerFor<256> for T
Source§type Aligner = AlignTo256<T>
type Aligner = AlignTo256<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<32> for T
impl<T> AlignerFor<32> for T
Source§impl<T> AlignerFor<32768> for T
impl<T> AlignerFor<32768> for T
Source§type Aligner = AlignTo32768<T>
type Aligner = AlignTo32768<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<4> for T
impl<T> AlignerFor<4> for T
Source§impl<T> AlignerFor<4096> for T
impl<T> AlignerFor<4096> for T
Source§type Aligner = AlignTo4096<T>
type Aligner = AlignTo4096<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<512> for T
impl<T> AlignerFor<512> for T
Source§type Aligner = AlignTo512<T>
type Aligner = AlignTo512<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<64> for T
impl<T> AlignerFor<64> for T
Source§impl<T> AlignerFor<8> for T
impl<T> AlignerFor<8> for T
Source§impl<T> AlignerFor<8192> for T
impl<T> AlignerFor<8192> for T
Source§type Aligner = AlignTo8192<T>
type Aligner = AlignTo8192<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<S> ROExtAcc for S
impl<S> ROExtAcc for S
Source§fn f_get<F>(&self, offset: FieldOffset<S, F, Aligned>) -> &F
fn f_get<F>(&self, offset: FieldOffset<S, F, Aligned>) -> &F
offset
. Read moreSource§fn f_get_mut<F>(&mut self, offset: FieldOffset<S, F, Aligned>) -> &mut F
fn f_get_mut<F>(&mut self, offset: FieldOffset<S, F, Aligned>) -> &mut F
offset
. Read moreSource§fn f_get_ptr<F, A>(&self, offset: FieldOffset<S, F, A>) -> *const F
fn f_get_ptr<F, A>(&self, offset: FieldOffset<S, F, A>) -> *const F
offset
. Read moreSource§fn f_get_mut_ptr<F, A>(&mut self, offset: FieldOffset<S, F, A>) -> *mut F
fn f_get_mut_ptr<F, A>(&mut self, offset: FieldOffset<S, F, A>) -> *mut F
offset
. Read moreSource§impl<S> ROExtOps<Aligned> for S
impl<S> ROExtOps<Aligned> for S
Source§fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Aligned>, value: F) -> F
fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Aligned>, value: F) -> F
offset
) with value
,
returning the previous value of the field. Read moreSource§fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Aligned>) -> Fwhere
F: Copy,
fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Aligned>) -> Fwhere
F: Copy,
Source§impl<S> ROExtOps<Unaligned> for S
impl<S> ROExtOps<Unaligned> for S
Source§fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Unaligned>, value: F) -> F
fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Unaligned>, value: F) -> F
offset
) with value
,
returning the previous value of the field. Read moreSource§fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Unaligned>) -> Fwhere
F: Copy,
fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Unaligned>) -> Fwhere
F: Copy,
Source§impl<T> SelfOps for Twhere
T: ?Sized,
impl<T> SelfOps for Twhere
T: ?Sized,
Source§fn piped<F, U>(self, f: F) -> U
fn piped<F, U>(self, f: F) -> U
Source§fn piped_ref<'a, F, U>(&'a self, f: F) -> Uwhere
F: FnOnce(&'a Self) -> U,
fn piped_ref<'a, F, U>(&'a self, f: F) -> Uwhere
F: FnOnce(&'a Self) -> U,
piped
except that the function takes &Self
Useful for functions that take &Self
instead of Self
. Read moreSource§fn piped_mut<'a, F, U>(&'a mut self, f: F) -> Uwhere
F: FnOnce(&'a mut Self) -> U,
fn piped_mut<'a, F, U>(&'a mut self, f: F) -> Uwhere
F: FnOnce(&'a mut Self) -> U,
piped
, except that the function takes &mut Self
.
Useful for functions that take &mut Self
instead of Self
.Source§fn mutated<F>(self, f: F) -> Self
fn mutated<F>(self, f: F) -> Self
Source§fn observe<F>(self, f: F) -> Self
fn observe<F>(self, f: F) -> Self
Source§fn as_ref_<T>(&self) -> &T
fn as_ref_<T>(&self) -> &T
AsRef
,
using the turbofish .as_ref_::<_>()
syntax. Read more