Module batching

Module batching 

Source
Expand description

Append records batching stream.

StreamClient::append_session accepts a stream of AppendInputs which requires a user to batch records into AppendRecordBatches. This module provides a way to smartly batch AppendRecords based on size limits and linger duration.

The stream enforces the provided fencing token (if any) and auto-increments matching sequence number for concurrency control.

§Example usage

let append_records_stream = futures::stream::iter([
    AppendRecord::new("hello").unwrap(),
    AppendRecord::new("bye").unwrap(),
    // ...
]);

let batching_opts = AppendRecordsBatchingOpts::new()
    .with_max_batch_records(100)
    .with_linger(Duration::from_millis(100))
    .with_fencing_token(Some(fencing_token))
    .with_match_seq_num(Some(10));

let batching_stream = AppendRecordsBatchingStream::new(
    append_records_stream,
    batching_opts,
);

let ack_stream = stream_client.append_session(batching_stream).await?;

Structs§

AppendRecordsBatchingOpts
Options to configure batching scheme for AppendRecordsBatchingStream.
AppendRecordsBatchingStream
Wrap a stream of AppendRecords as a stream of AppendInputs by smartly batching records together based on batching options provided by AppendRecordsBatchingOpts.