[][src]Module multistream_batch::channel::multi_buf_batch

This module provides MultiBufBatchChannel that will buffer items into multiple internal batches based on batch stream key until one of the batches is ready and provides this items in one go along with the batch stream key using Drain iterator.


Collect batches of items from two streams by reaching different individual batch limits and using Flush command.

use multistream_batch::channel::multi_buf_batch::MultiBufBatchChannel;
use multistream_batch::channel::multi_buf_batch::Command::*;
use std::time::Duration;
use assert_matches::assert_matches;

// Create producer thread and batcher with maximum size of 4 items (for each stream) and
// maximum batch duration since first received item of 200 ms.
let mut batch = MultiBufBatchChannel::with_producer_thread(4, Duration::from_millis(200), 10, |sender| {
    // Send a sequence of `Append` commands with integer stream key and item value
    sender.send(Append(1, 1)).unwrap();
    sender.send(Append(0, 1)).unwrap();
    sender.send(Append(1, 2)).unwrap();
    sender.send(Append(0, 2)).unwrap();
    sender.send(Append(1, 3)).unwrap();
    sender.send(Append(0, 3)).unwrap();
    sender.send(Append(1, 4)).unwrap();
    // At this point batch with stream key `1` should have reached its capacity of 4 items
    sender.send(Append(0, 4)).unwrap();
    // At this point batch with stream key `0` should have reached its capacity of 4 items

    // Send some more to buffer up for next batch
    sender.send(Append(0, 5)).unwrap();
    sender.send(Append(1, 5)).unwrap();
    sender.send(Append(1, 6)).unwrap();
    sender.send(Append(0, 6)).unwrap();

    // Introduce delay to trigger maximum duration timeout

    // Send items that will be flushed by `Flush` command
    sender.send(Append(0, 7)).unwrap();
    sender.send(Append(1, 7)).unwrap();
    sender.send(Append(1, 8)).unwrap();
    sender.send(Append(0, 8)).unwrap();
    // Flush outstanding items for batch with stream key `1` and `0`

    // Last buffered up items will be flushed automatically when this thread exits
    sender.send(Append(0, 9)).unwrap();
    sender.send(Append(1, 9)).unwrap();
    sender.send(Append(1, 10)).unwrap();
    sender.send(Append(0, 10)).unwrap();
    // Exiting closure will shutdown the producer thread

// Batches flushed due to individual batch size limit
assert_matches!(batch.next(), Ok((1, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])

assert_matches!(batch.next(), Ok((0, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])

// Batches flushed due to duration limit
assert_matches!(batch.next(), Ok((0, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])

assert_matches!(batch.next(), Ok((1, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [5, 6])

// Batches flushed by sending `Flush` command starting from batch with stream key `1`
assert_matches!(batch.next(), Ok((1, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])

assert_matches!(batch.next(), Ok((0, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [7, 8])

// Batches flushed by dropping sender (thread exit)
assert_matches!(batch.next(), Ok((0, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])

assert_matches!(batch.next(), Ok((1, drain)) =>
    assert_eq!(drain.collect::<Vec<_>>().as_slice(), [9, 10])


pub use crate::multi_buf_batch::Stats;



Collects items into multiple batches based on stream key. A batch may become ready after collecting max_size number of items or until max_duration has elapsed since first item was appended to the batch.



Commands that can be send to MultiBufBatchChannel via Sender endpoint.