Struct BatchCoalescer

Source
pub struct BatchCoalescer { /* private fields */ }
Expand description

Concatenate multiple RecordBatches

Implements the common pattern of incrementally creating output RecordBatches of a specific size from an input stream of RecordBatches.

This is useful after operations such as filter and take that produce smaller batches, and we want to coalesce them into larger batches for further processing.

See: https://github.com/apache/arrow-rs/issues/6692

§Example

use arrow_array::record_batch;
use arrow_select::coalesce::{BatchCoalescer};
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();

// Create a `BatchCoalescer` that will produce batches with at least 4 rows
let target_batch_size = 4;
let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);

// push the batches
coalescer.push_batch(batch1).unwrap();
// only pushed 3 rows (not yet 4, enough to produce a batch)
assert!(coalescer.next_completed_batch().is_none());
coalescer.push_batch(batch2).unwrap();
// now we have 5 rows, so we can produce a batch
let finished = coalescer.next_completed_batch().unwrap();
// 4 rows came out (target batch size is 4)
let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
assert_eq!(finished, expected);

// Have no more input, but still have an in-progress batch
assert!(coalescer.next_completed_batch().is_none());
// We can finish the batch, which will produce the remaining rows
coalescer.finish_buffered_batch().unwrap();
let expected = record_batch!(("a", Int32, [5])).unwrap();
assert_eq!(coalescer.next_completed_batch().unwrap(), expected);

// The coalescer is now empty
assert!(coalescer.next_completed_batch().is_none());

§Background

Generally speaking, larger RecordBatches are more efficient to process than smaller RecordBatches (until the CPU cache is exceeded) because there is fixed processing overhead per batch. This coalescer builds up these larger batches incrementally.

┌────────────────────┐
│    RecordBatch     │
│   num_rows = 100   │
└────────────────────┘                 ┌────────────────────┐
                                       │                    │
┌────────────────────┐     Coalesce    │                    │
│                    │      Batches    │                    │
│    RecordBatch     │                 │                    │
│   num_rows = 200   │  ─ ─ ─ ─ ─ ─ ▶  │                    │
│                    │                 │    RecordBatch     │
│                    │                 │   num_rows = 400   │
└────────────────────┘                 │                    │
                                       │                    │
┌────────────────────┐                 │                    │
│                    │                 │                    │
│    RecordBatch     │                 │                    │
│   num_rows = 100   │                 └────────────────────┘
│                    │
└────────────────────┘

§Notes:

  1. Output rows are produced in the same order as the input rows

  2. The output is a sequence of batches, with all but the last being at exactly target_batch_size rows.

Implementations§

Source§

impl BatchCoalescer

Source

pub fn new(schema: Arc<Schema>, batch_size: usize) -> BatchCoalescer

Create a new BatchCoalescer

§Arguments
  • schema - the schema of the output batches
  • batch_size - the number of rows in each output batch. Typical values are 4096 or 8192 rows.
Source

pub fn schema(&self) -> Arc<Schema>

Return the schema of the output batches

Source

pub fn push_batch_with_filter( &mut self, batch: RecordBatch, filter: &BooleanArray, ) -> Result<(), ArrowError>

Push a batch into the Coalescer after applying a filter

This is semantically equivalent of calling Self::push_batch with the results from filter_record_batch

§Example
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
// Apply a filter to each batch to pick the first and last row
let filter = BooleanArray::from(vec![true, false, true]);
// create a new Coalescer that targets creating 1000 row batches
let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
coalescer.push_batch_with_filter(batch1, &filter);
coalescer.push_batch_with_filter(batch2, &filter);
// finsh and retrieve the created batch
coalescer.finish_buffered_batch().unwrap();
let completed_batch = coalescer.next_completed_batch().unwrap();
// filtered out 2 and 5:
let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
assert_eq!(completed_batch, expected_batch);
Source

pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>

Push all the rows from batch into the Coalescer

See Self::next_completed_batch() to retrieve any completed batches.

§Example
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
// create a new Coalescer that targets creating 1000 row batches
let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
coalescer.push_batch(batch1);
coalescer.push_batch(batch2);
// finsh and retrieve the created batch
coalescer.finish_buffered_batch().unwrap();
let completed_batch = coalescer.next_completed_batch().unwrap();
let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
assert_eq!(completed_batch, expected_batch);
Source

pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError>

Concatenates any buffered batches into a single RecordBatch and clears any output buffers

Normally this is called when the input stream is exhausted, and we want to finalize the last batch of rows.

See Self::next_completed_batch() for the completed batches.

Source

pub fn is_empty(&self) -> bool

Returns true if there is any buffered data

Source

pub fn has_completed_batch(&self) -> bool

Returns true if there are any completed batches

Source

pub fn next_completed_batch(&mut self) -> Option<RecordBatch>

Returns the next completed batch, if any

Trait Implementations§

Source§

impl Debug for BatchCoalescer

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> AlignerFor<1> for T

Source§

type Aligner = AlignTo1<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<1024> for T

Source§

type Aligner = AlignTo1024<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<128> for T

Source§

type Aligner = AlignTo128<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<16> for T

Source§

type Aligner = AlignTo16<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<16384> for T

Source§

type Aligner = AlignTo16384<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<2> for T

Source§

type Aligner = AlignTo2<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<2048> for T

Source§

type Aligner = AlignTo2048<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<256> for T

Source§

type Aligner = AlignTo256<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<32> for T

Source§

type Aligner = AlignTo32<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<32768> for T

Source§

type Aligner = AlignTo32768<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<4> for T

Source§

type Aligner = AlignTo4<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<4096> for T

Source§

type Aligner = AlignTo4096<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<512> for T

Source§

type Aligner = AlignTo512<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<64> for T

Source§

type Aligner = AlignTo64<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<8> for T

Source§

type Aligner = AlignTo8<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> AlignerFor<8192> for T

Source§

type Aligner = AlignTo8192<T>

The AlignTo* type which aligns Self to ALIGNMENT.
Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<S> ROExtAcc for S

Source§

fn f_get<F>(&self, offset: FieldOffset<S, F, Aligned>) -> &F

Gets a reference to a field, determined by offset. Read more
Source§

fn f_get_mut<F>(&mut self, offset: FieldOffset<S, F, Aligned>) -> &mut F

Gets a muatble reference to a field, determined by offset. Read more
Source§

fn f_get_ptr<F, A>(&self, offset: FieldOffset<S, F, A>) -> *const F

Gets a const pointer to a field, the field is determined by offset. Read more
Source§

fn f_get_mut_ptr<F, A>(&mut self, offset: FieldOffset<S, F, A>) -> *mut F

Gets a mutable pointer to a field, determined by offset. Read more
Source§

impl<S> ROExtOps<Aligned> for S

Source§

fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Aligned>, value: F) -> F

Replaces a field (determined by offset) with value, returning the previous value of the field. Read more
Source§

fn f_swap<F>(&mut self, offset: FieldOffset<S, F, Aligned>, right: &mut S)

Swaps a field (determined by offset) with the same field in right. Read more
Source§

fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Aligned>) -> F
where F: Copy,

Gets a copy of a field (determined by offset). The field is determined by offset. Read more
Source§

impl<S> ROExtOps<Unaligned> for S

Source§

fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Unaligned>, value: F) -> F

Replaces a field (determined by offset) with value, returning the previous value of the field. Read more
Source§

fn f_swap<F>(&mut self, offset: FieldOffset<S, F, Unaligned>, right: &mut S)

Swaps a field (determined by offset) with the same field in right. Read more
Source§

fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Unaligned>) -> F
where F: Copy,

Gets a copy of a field (determined by offset). The field is determined by offset. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> SelfOps for T
where T: ?Sized,

Source§

fn eq_id(&self, other: &Self) -> bool

Compares the address of self with the address of other. Read more
Source§

fn piped<F, U>(self, f: F) -> U
where F: FnOnce(Self) -> U, Self: Sized,

Emulates the pipeline operator, allowing method syntax in more places. Read more
Source§

fn piped_ref<'a, F, U>(&'a self, f: F) -> U
where F: FnOnce(&'a Self) -> U,

The same as piped except that the function takes &Self Useful for functions that take &Self instead of Self. Read more
Source§

fn piped_mut<'a, F, U>(&'a mut self, f: F) -> U
where F: FnOnce(&'a mut Self) -> U,

The same as piped, except that the function takes &mut Self. Useful for functions that take &mut Self instead of Self.
Source§

fn mutated<F>(self, f: F) -> Self
where F: FnOnce(&mut Self), Self: Sized,

Mutates self using a closure taking self by mutable reference, passing it along the method chain. Read more
Source§

fn observe<F>(self, f: F) -> Self
where F: FnOnce(&Self), Self: Sized,

Observes the value of self, passing it along unmodified. Useful in long method chains. Read more
Source§

fn into_<T>(self) -> T
where Self: Into<T>,

Performs a conversion with Into. using the turbofish .into_::<_>() syntax. Read more
Source§

fn as_ref_<T>(&self) -> &T
where Self: AsRef<T>, T: ?Sized,

Performs a reference to reference conversion with AsRef, using the turbofish .as_ref_::<_>() syntax. Read more
Source§

fn as_mut_<T>(&mut self) -> &mut T
where Self: AsMut<T>, T: ?Sized,

Performs a mutable reference to mutable reference conversion with AsMut, using the turbofish .as_mut_::<_>() syntax. Read more
Source§

fn drop_(self)
where Self: Sized,

Drops self using method notation. Alternative to std::mem::drop. Read more
Source§

impl<This> TransmuteElement for This
where This: ?Sized,

Source§

unsafe fn transmute_element<T>(self) -> Self::TransmutedPtr
where Self: CanTransmuteElement<T>,

Transmutes the element type of this pointer.. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> TypeIdentity for T
where T: ?Sized,

Source§

type Type = T

This is always Self.
Source§

fn into_type(self) -> Self::Type
where Self: Sized, Self::Type: Sized,

Converts a value back to the original type.
Source§

fn as_type(&self) -> &Self::Type

Converts a reference back to the original type.
Source§

fn as_type_mut(&mut self) -> &mut Self::Type

Converts a mutable reference back to the original type.
Source§

fn into_type_box(self: Box<Self>) -> Box<Self::Type>

Converts a box back to the original type.
Source§

fn into_type_arc(this: Arc<Self>) -> Arc<Self::Type>

Converts an Arc back to the original type. Read more
Source§

fn into_type_rc(this: Rc<Self>) -> Rc<Self::Type>

Converts an Rc back to the original type. Read more
Source§

fn from_type(this: Self::Type) -> Self
where Self: Sized, Self::Type: Sized,

Converts a value back to the original type.
Source§

fn from_type_ref(this: &Self::Type) -> &Self

Converts a reference back to the original type.
Source§

fn from_type_mut(this: &mut Self::Type) -> &mut Self

Converts a mutable reference back to the original type.
Source§

fn from_type_box(this: Box<Self::Type>) -> Box<Self>

Converts a box back to the original type.
Source§

fn from_type_arc(this: Arc<Self::Type>) -> Arc<Self>

Converts an Arc back to the original type.
Source§

fn from_type_rc(this: Rc<Self::Type>) -> Rc<Self>

Converts an Rc back to the original type.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,

Source§

impl<T> Ungil for T
where T: Send,